You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:32 UTC
[10/50] [abbrv] ignite git commit: revisited future + cheat cache
revisited future + cheat cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ca11bca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ca11bca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ca11bca
Branch: refs/heads/ignite-comm-balance-master
Commit: 2ca11bca1a9f6b4926cffc196fa1b88d7a3da62c
Parents: 4ed9fee
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Nov 9 21:41:52 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Nov 9 21:41:52 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/BenchAtomic.java | 2 +
.../GridCachePartitionExchangeManager.java | 4 +-
.../processors/cache/GridCacheUtils.java | 22 ++
.../processors/cache/IgniteCacheProxy.java | 12 +
.../dht/atomic/GridDhtAtomicCache.java | 14 +
.../dht/atomic/GridNearAtomicUpdateFuture.java | 10 +-
.../internal/util/future/GridFutureAdapter.java | 385 ++++++++++---------
7 files changed, 269 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
index 382408d..4f99123 100644
--- a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
+++ b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
@@ -19,6 +19,7 @@ package org.apache.ignite;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -230,6 +231,7 @@ public class BenchAtomic {
// .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY)
// .setAffinity(new FairAffinityFunction(1024))
.setBackups(0)
+ .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY)
.setRebalanceMode(CacheRebalanceMode.SYNC)
.setCopyOnRead(false)
// .setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4eb61e3..8495425 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1207,7 +1207,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (mvcc != null) {
for (GridCacheFuture<?> fut : mvcc.activeFutures()) {
- if (curTime - fut.startTime() > timeout) {
+ if (curTime - fut.startTime() > timeout && fut.startTime() != 0 /*TODO*/ ) {
found = true;
if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
@@ -1220,7 +1220,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
for (GridCacheFuture<?> fut : mvcc.atomicFutures()) {
- if (curTime - fut.startTime() > timeout) {
+ if (curTime - fut.startTime() > timeout && fut.startTime() != 0 /*TODO*/) {
found = true;
if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 4c18f21..a71b2e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -110,6 +110,28 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
* Cache utility methods.
*/
public class GridCacheUtils {
+ // TODO cheat cache ID.
+ public static final int cheatCacheId;
+
+ static {
+ String cheatCache = System.getProperty("CHEAT_CACHE");
+
+ if (cheatCache != null) {
+ cheatCacheId = cheatCache.hashCode();
+
+ if (cheatCacheId == 0)
+ throw new RuntimeException();
+
+ System.out.println(">>> Cheat cache ID [id=" + cheatCacheId + ", name=" + cheatCache + ']');
+ }
+ else
+ cheatCacheId = 0;
+ }
+
+ public static boolean cheatCache(int id) {
+ return cheatCacheId != 0 && id == cheatCacheId;
+ }
+
/** Hadoop syste cache name. */
public static final String SYS_CACHE_HADOOP_MR = "ignite-hadoop-mr-sys-cache";
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 297ec68..41b71f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1303,6 +1303,18 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
@Override public void put(K key, V val) {
+ // TODO
+ if (CU.cheatCache(ctx.cacheId())) {
+ try {
+ delegate.put(key, val);
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+
+ return;
+ }
+
try {
GridCacheGateway<K, V> gate = this.gate;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 30a3d57..38a8223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1104,6 +1104,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
waitTopFut);
+ // TODO
+ if (CU.cheatCache(ctx.cacheId())) {
+ updateFut.map();
+
+ return updateFut;
+ }
+
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@Override public IgniteInternalFuture<Object> apply() {
updateFut.map();
@@ -1145,6 +1152,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridNearAtomicAbstractUpdateFuture updateFut =
createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut);
+ // TODO
+ if (CU.cheatCache(ctx.cacheId())) {
+ updateFut.map();
+
+ return updateFut;
+ }
+
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@Override public IgniteInternalFuture<Object> apply() {
updateFut.map();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 30a0c3d..a7a1f7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -611,9 +611,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @param remapKeys Keys to remap.
*/
void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+ // TODO
+ Collection<ClusterNode> topNodes = CU.cheatCache(cctx.cacheId()) ?
+ Collections.<ClusterNode>emptyList() : CU.affinityNodes(cctx, topVer);
- if (F.isEmpty(topNodes)) {
+ if (!CU.cheatCache(cctx.cacheId()) && F.isEmpty(topNodes)) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid)."));
@@ -651,6 +653,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
}
else {
+ // TODO
+ if (CU.cheatCache(cctx.cacheId()))
+ throw new RuntimeException();
+
Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
topVer,
futVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index ea7a202..0c6a03b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -17,16 +17,14 @@
package org.apache.ignite.internal.util.future;
-import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -37,84 +35,72 @@ import org.jetbrains.annotations.Nullable;
/**
* Future adapter.
*/
-public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Initial state. */
- private static final int INIT = 0;
+public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
+ // https://bugs.openjdk.java.net/browse/JDK-8074773
+ static {
+ Class<?> ensureLoaded = LockSupport.class;
+ }
- /** Cancelled state. */
- private static final int CANCELLED = 1;
+ private enum State {
+ INIT,
- /** Done state. */
- private static final int DONE = 2;
+ CANCELLED,
+ }
- /** */
- private static final byte ERR = 1;
+ private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd =
+ AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
/** */
- private static final byte RES = 2;
+ private static final long serialVersionUID = 0L;
/** */
- private byte resFlag;
-
- /** Result. */
- @GridToStringInclude
- private Object res;
+// private boolean ignoreInterrupts;
- /** Future start time. */
- private final long startTime = U.currentTimeMillis();
+ private volatile Object state = State.INIT;
- /** Future end time. */
- private volatile long endTime;
-
- /** */
- private boolean ignoreInterrupts;
-
- /** */
- @GridToStringExclude
- private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
+// private long l1, l2, l3, l4, l5, l6, l7;
/** {@inheritDoc} */
@Override public long startTime() {
- return startTime;
+ return 0;
}
/** {@inheritDoc} */
@Override public long duration() {
- long endTime = this.endTime;
-
- return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
+ return 0;
}
/**
* @param ignoreInterrupts Ignore interrupts flag.
*/
public void ignoreInterrupts(boolean ignoreInterrupts) {
- this.ignoreInterrupts = ignoreInterrupts;
+ // this.ignoreInterrupts = ignoreInterrupts;
}
/**
* @return Future end time.
*/
public long endTime() {
- return endTime;
+ return 0;
}
/** {@inheritDoc} */
@Override public Throwable error() {
- return (resFlag == ERR) ? (Throwable)res : null;
+ Object state0 = state;
+
+ return (state0 instanceof Throwable) ? (Throwable)state0 : null;
}
/** {@inheritDoc} */
@Override public R result() {
- return resFlag == RES ? (R)res : null;
+ Object state0 = state;
+
+ return isDone(state0) && !(state0 instanceof Throwable) ? (R)state0 : null;
}
/** {@inheritDoc} */
@Override public R get() throws IgniteCheckedException {
- return get0(ignoreInterrupts);
+ return get0(false);
}
/** {@inheritDoc} */
@@ -151,28 +137,34 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* @throws IgniteCheckedException If failed.
*/
private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
- try {
- if (endTime == 0) {
- if (ignoreInterrupts)
- acquireShared(0);
- else
- acquireSharedInterruptibly(0);
- }
+ Object res = registerWaiter(Thread.currentThread());
- if (getState() == CANCELLED)
- throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
+ if (res != State.INIT) {
+ // no registration was done since a value is available.
+ return resolveAndThrow(res);
+ }
- assert resFlag != 0;
+ boolean interrupted = false;
- if (resFlag == ERR)
- throw U.cast((Throwable)res);
+ try {
+ for (; ; ) {
+ LockSupport.park();
- return (R)res;
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (isDone())
+ return resolveAndThrow(state);
+ else if (Thread.interrupted()) {
+ interrupted = true;
- throw new IgniteInterruptedCheckedException(e);
+ if (!ignoreInterrupts) {
+ unregisterWaiter(Thread.currentThread());
+
+ throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+ }
+ }
+ }
+ }
+ finally {
+ restoreInterrupt(interrupted);
}
}
@@ -184,71 +176,156 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* @throws IgniteCheckedException If error occurred.
*/
@Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
- if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
- throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+ Object res = registerWaiter(Thread.currentThread());
+
+ if (res != State.INIT)
+ return resolveAndThrow(res);
+
+ long deadlineNanos = System.nanoTime() + nanosTimeout;
+
+ boolean interrupted = false;
+
+ try {
+ long nanosTimeout0 = nanosTimeout;
- if (getState() == CANCELLED)
+ while (nanosTimeout0 > 0) {
+ LockSupport.parkNanos(nanosTimeout0);
+
+ nanosTimeout0 = deadlineNanos - System.nanoTime();
+
+ if (isDone())
+ return resolveAndThrow(state);
+
+ else if (Thread.interrupted()) {
+ interrupted = true;
+// TODO
+// if (!ignoreInterrupts)
+// throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+ }
+ }
+ }
+ finally {
+ restoreInterrupt(interrupted);
+
+ unregisterWaiter(Thread.currentThread());
+ }
+
+ throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+ }
+
+ protected R resolveAndThrow(Object val) throws IgniteCheckedException {
+ if (val == State.CANCELLED)
throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
- assert resFlag != 0;
+ if (val instanceof Throwable)
+ throw U.cast((Throwable)val);
+
+ return (R)val;
+ }
- if (resFlag == ERR)
- throw U.cast((Throwable)res);
- return (R)res;
+ private void restoreInterrupt(boolean interrupted) {
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
- /** {@inheritDoc} */
- @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) {
- assert lsnr0 != null;
+ private Object registerWaiter(Object waiter) {
+ WaitNode waitNode = null;
- boolean done = isDone();
+ for (; ; ) {
+ final Object oldState = state;
- if (!done) {
- synchronized (this) {
- done = isDone(); // Double check.
+ if (isDone(oldState))
+ return oldState;
- if (!done) {
- if (lsnr == null)
- lsnr = lsnr0;
- else if (lsnr instanceof ArrayListener)
- ((ArrayListener)lsnr).add(lsnr0);
- else
- lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0);
+ Object newState;
- return;
- }
+ if (oldState == State.INIT)
+ newState = waiter;
+
+ else {
+ if (waitNode == null)
+ waitNode = new WaitNode(waiter);
+
+ waitNode.next = oldState;
+
+ newState = waitNode;
}
+
+ if (compareAndSetState(oldState, newState))
+ return State.INIT;
}
+ }
- assert done;
+ void unregisterWaiter(Thread waiter) {
+ WaitNode prev = null;
+ Object current = state;
- notifyListener(lsnr0);
- }
+ while (current != null) {
+ Object currentWaiter = current.getClass() == WaitNode.class ? ((WaitNode)current).waiter : current;
+ Object next = current.getClass() == WaitNode.class ? ((WaitNode)current).next : null;
- /** {@inheritDoc} */
- @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
- return new ChainFuture<>(this, doneCb);
+ if (currentWaiter == waiter) {
+ // it is the item we are looking for, so lets try to remove it
+ if (prev == null) {
+ // it's the first item of the stack, so we need to change the head to the next
+ Object n = next == null ? State.INIT : next;
+ // if we manage to CAS we are done, else we need to restart
+ current = compareAndSetState(
+ current,
+ n) ? null : state;
+ }
+ else {
+ // remove the current item (this is done by letting the prev.next point to the next instead of current)
+ prev.next = next;
+ // end the loop
+ current = null;
+ }
+ }
+ else {
+ // it isn't the item we are looking for, so lets move on to the next
+ prev = current.getClass() == WaitNode.class ? (WaitNode)current : null;
+ current = next;
+ }
+ }
}
- /**
- * Notifies all registered listeners.
- */
- private void notifyListeners() {
- IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0;
- synchronized (this) {
- lsnr0 = lsnr;
+ private void unblockAll(Object waiter) {
+ while (waiter != null) {
+ if (waiter instanceof Thread) {
+ LockSupport.unpark((Thread)waiter);
+
+ return;
+ }
+ else if (waiter instanceof IgniteInClosure) {
+ notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter);
- if (lsnr0 == null)
return;
+ }
+ else if (waiter.getClass() == WaitNode.class) {
+ WaitNode waitNode = (WaitNode) waiter;
- lsnr = null;
+ unblockAll(waitNode.waiter);
+
+ waiter = waitNode.next;
+ }
+ else
+ return;
}
+ }
+
+ /** {@inheritDoc} */
+ @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> newLsnr) {
+ Object res = registerWaiter(newLsnr);
- assert lsnr0 != null;
+ if (res != State.INIT)
+ notifyListener(newLsnr);
+ }
- notifyListener(lsnr0);
+ /** {@inheritDoc} */
+ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
+ return new ChainFuture<>(this, doneCb);
}
/**
@@ -285,9 +362,15 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** {@inheritDoc} */
@Override public boolean isDone() {
- // Don't check for "valid" here, as "done" flag can be read
- // even in invalid state.
- return endTime != 0;
+ return isDone(state);
+ }
+
+ private boolean isDone(Object state) {
+ return state == null ||
+ !(state == State.INIT
+ || state.getClass() == WaitNode.class
+ || state instanceof Thread
+ || state instanceof IgniteInClosure);
}
/**
@@ -295,12 +378,12 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
*/
public boolean isFailed() {
// Must read endTime first.
- return endTime != 0 && resFlag == ERR;
+ return state instanceof Throwable;
}
/** {@inheritDoc} */
@Override public boolean isCancelled() {
- return getState() == CANCELLED;
+ return state == State.CANCELLED;
}
/**
@@ -354,34 +437,26 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* @return {@code True} if result was set by this call.
*/
private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
- boolean notify = false;
+ Object val = cancel ? State.CANCELLED : err != null ? err : res;
- try {
- if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
- if (err != null) {
- resFlag = ERR;
- this.res = err;
- }
- else {
- resFlag = RES;
- this.res = res;
- }
+ for (; ; ) {
+ final Object oldState = state;
- notify = true;
+ if (isDone(oldState))
+ return false;
- releaseShared(0);
+ if (compareAndSetState(oldState, val)) {
+ unblockAll(oldState);
return true;
}
-
- return false;
- }
- finally {
- if (notify)
- notifyListeners();
}
}
+ private boolean compareAndSetState(Object oldState, Object newState) {
+ return state == oldState && stateUpd.compareAndSet(this, oldState, newState);
+ }
+
/**
* Callback to notify that future is cancelled.
*
@@ -391,26 +466,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
return onDone(null, null, true);
}
- /** {@inheritDoc} */
- @Override protected final int tryAcquireShared(int ignore) {
- return endTime != 0 ? 1 : -1;
- }
-
- /** {@inheritDoc} */
- @Override protected final boolean tryReleaseShared(int ignore) {
- endTime = U.currentTimeMillis();
-
- // Always signal after setting final done status.
- return true;
- }
-
/**
* @return String representation of state.
*/
private String state() {
- int s = getState();
+ Object s = state;
- return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
+ return s instanceof State ? s.toString() : isDone(s) ? "DONE" : "INIT";
}
/** {@inheritDoc} */
@@ -421,44 +483,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/**
*
*/
- private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
-
- /**
- * @param lsnrs Listeners.
- */
- private ArrayListener(IgniteInClosure... lsnrs) {
- this.arr = lsnrs;
- }
-
- /** {@inheritDoc} */
- @Override public void apply(IgniteInternalFuture<R> fut) {
- for (int i = 0; i < arr.length; i++)
- arr[i].apply(fut);
- }
-
- /**
- * @param lsnr Listener.
- */
- void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
- arr = Arrays.copyOf(arr, arr.length + 1);
-
- arr[arr.length - 1] = lsnr;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ArrayListener.class, this, "arrSize", arr.length);
- }
- }
-
- /**
- *
- */
private static class ChainFuture<R, T> extends GridFutureAdapter<T> {
/** */
private static final long serialVersionUID = 0L;
@@ -495,4 +519,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
return "ChainFuture [orig=" + fut + ", doneCb=" + doneCb + ']';
}
}
+
+ static final class WaitNode {
+ final Object waiter;
+ volatile Object next;
+
+ WaitNode(Object waiter) {
+ this.waiter = waiter;
+ }
+ }
}