You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:32 UTC

[10/50] [abbrv] ignite git commit: revisited future + cheat cache

revisited future + cheat cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ca11bca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ca11bca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ca11bca

Branch: refs/heads/ignite-comm-balance-master
Commit: 2ca11bca1a9f6b4926cffc196fa1b88d7a3da62c
Parents: 4ed9fee
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Nov 9 21:41:52 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Nov 9 21:41:52 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/BenchAtomic.java     |   2 +
 .../GridCachePartitionExchangeManager.java      |   4 +-
 .../processors/cache/GridCacheUtils.java        |  22 ++
 .../processors/cache/IgniteCacheProxy.java      |  12 +
 .../dht/atomic/GridDhtAtomicCache.java          |  14 +
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  10 +-
 .../internal/util/future/GridFutureAdapter.java | 385 ++++++++++---------
 7 files changed, 269 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
index 382408d..4f99123 100644
--- a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
+++ b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java
@@ -19,6 +19,7 @@ package org.apache.ignite;
 
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -230,6 +231,7 @@ public class BenchAtomic {
 //            .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY)
            // .setAffinity(new FairAffinityFunction(1024))
             .setBackups(0)
+            .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY)
             .setRebalanceMode(CacheRebalanceMode.SYNC)
             .setCopyOnRead(false)
 //            .setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4eb61e3..8495425 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1207,7 +1207,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             if (mvcc != null) {
                 for (GridCacheFuture<?> fut : mvcc.activeFutures()) {
-                    if (curTime - fut.startTime() > timeout) {
+                    if (curTime - fut.startTime() > timeout && fut.startTime() != 0 /*TODO*/ ) {
                         found = true;
 
                         if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
@@ -1220,7 +1220,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
 
                 for (GridCacheFuture<?> fut : mvcc.atomicFutures()) {
-                    if (curTime - fut.startTime() > timeout) {
+                    if (curTime - fut.startTime() > timeout && fut.startTime() != 0 /*TODO*/) {
                         found = true;
 
                         if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 4c18f21..a71b2e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -110,6 +110,28 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
  * Cache utility methods.
  */
 public class GridCacheUtils {
+    // TODO cheat cache ID.
+    public static final int cheatCacheId;
+
+    static {
+        String cheatCache = System.getProperty("CHEAT_CACHE");
+
+        if (cheatCache != null) {
+            cheatCacheId = cheatCache.hashCode();
+
+            if (cheatCacheId == 0)
+                throw new RuntimeException();
+
+            System.out.println(">>> Cheat cache ID [id=" + cheatCacheId + ", name=" + cheatCache + ']');
+        }
+        else
+            cheatCacheId = 0;
+    }
+
+    public static boolean cheatCache(int id) {
+        return cheatCacheId != 0 && id == cheatCacheId;
+    }
+
     /**  Hadoop syste cache name. */
     public static final String SYS_CACHE_HADOOP_MR = "ignite-hadoop-mr-sys-cache";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 297ec68..41b71f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1303,6 +1303,18 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
 
     /** {@inheritDoc} */
     @Override public void put(K key, V val) {
+        // TODO
+        if (CU.cheatCache(ctx.cacheId())) {
+            try {
+                delegate.put(key, val);
+            }
+            catch (IgniteCheckedException e) {
+                throw cacheException(e);
+            }
+
+            return;
+        }
+
         try {
             GridCacheGateway<K, V> gate = this.gate;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 30a3d57..38a8223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1104,6 +1104,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
             waitTopFut);
 
+        // TODO
+        if (CU.cheatCache(ctx.cacheId())) {
+            updateFut.map();
+
+            return updateFut;
+        }
+
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
             @Override public IgniteInternalFuture<Object> apply() {
                 updateFut.map();
@@ -1145,6 +1152,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final GridNearAtomicAbstractUpdateFuture updateFut =
             createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut);
 
+        // TODO
+        if (CU.cheatCache(ctx.cacheId())) {
+            updateFut.map();
+
+            return updateFut;
+        }
+
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
             @Override public IgniteInternalFuture<Object> apply() {
                 updateFut.map();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 30a0c3d..a7a1f7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -611,9 +611,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @param remapKeys Keys to remap.
      */
     void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
-        Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+        // TODO
+        Collection<ClusterNode> topNodes = CU.cheatCache(cctx.cacheId()) ?
+            Collections.<ClusterNode>emptyList() : CU.affinityNodes(cctx, topVer);
 
-        if (F.isEmpty(topNodes)) {
+        if (!CU.cheatCache(cctx.cacheId()) && F.isEmpty(topNodes)) {
             onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                 "left the grid)."));
 
@@ -651,6 +653,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
             }
             else {
+                // TODO
+                if (CU.cheatCache(cctx.cacheId()))
+                    throw new RuntimeException();
+
                 Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
                     topVer,
                     futVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ca11bca/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index ea7a202..0c6a03b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -17,16 +17,14 @@
 
 package org.apache.ignite.internal.util.future;
 
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -37,84 +35,72 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Future adapter.
  */
-public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Initial state. */
-    private static final int INIT = 0;
+public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
+    // https://bugs.openjdk.java.net/browse/JDK-8074773
+    static {
+        Class<?> ensureLoaded = LockSupport.class;
+    }
 
-    /** Cancelled state. */
-    private static final int CANCELLED = 1;
+    private enum State {
+        INIT,
 
-    /** Done state. */
-    private static final int DONE = 2;
+        CANCELLED,
+    }
 
-    /** */
-    private static final byte ERR = 1;
+    private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd =
+        AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
 
     /** */
-    private static final byte RES = 2;
+    private static final long serialVersionUID = 0L;
 
     /** */
-    private byte resFlag;
-
-    /** Result. */
-    @GridToStringInclude
-    private Object res;
+//    private boolean ignoreInterrupts;
 
-    /** Future start time. */
-    private final long startTime = U.currentTimeMillis();
+    private volatile Object state = State.INIT;
 
-    /** Future end time. */
-    private volatile long endTime;
-
-    /** */
-    private boolean ignoreInterrupts;
-
-    /** */
-    @GridToStringExclude
-    private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
+//    private long l1, l2, l3, l4, l5, l6, l7;
 
     /** {@inheritDoc} */
     @Override public long startTime() {
-        return startTime;
+        return 0;
     }
 
     /** {@inheritDoc} */
     @Override public long duration() {
-        long endTime = this.endTime;
-
-        return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
+        return 0;
     }
 
     /**
      * @param ignoreInterrupts Ignore interrupts flag.
      */
     public void ignoreInterrupts(boolean ignoreInterrupts) {
-        this.ignoreInterrupts = ignoreInterrupts;
+        // this.ignoreInterrupts = ignoreInterrupts;
     }
 
     /**
      * @return Future end time.
      */
     public long endTime() {
-        return endTime;
+        return 0;
     }
 
     /** {@inheritDoc} */
     @Override public Throwable error() {
-        return (resFlag == ERR) ? (Throwable)res : null;
+        Object state0 = state;
+
+        return (state0 instanceof Throwable) ? (Throwable)state0 : null;
     }
 
     /** {@inheritDoc} */
     @Override public R result() {
-        return resFlag == RES ? (R)res : null;
+        Object state0 = state;
+
+        return isDone(state0) && !(state0 instanceof Throwable) ? (R)state0 : null;
     }
 
     /** {@inheritDoc} */
     @Override public R get() throws IgniteCheckedException {
-        return get0(ignoreInterrupts);
+        return get0(false);
     }
 
     /** {@inheritDoc} */
@@ -151,28 +137,34 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
      * @throws IgniteCheckedException If failed.
      */
     private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
-        try {
-            if (endTime == 0) {
-                if (ignoreInterrupts)
-                    acquireShared(0);
-                else
-                    acquireSharedInterruptibly(0);
-            }
+        Object res = registerWaiter(Thread.currentThread());
 
-            if (getState() == CANCELLED)
-                throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
+        if (res != State.INIT) {
+            // no registration was done since a value is available.
+            return resolveAndThrow(res);
+        }
 
-            assert resFlag != 0;
+        boolean interrupted = false;
 
-            if (resFlag == ERR)
-                throw U.cast((Throwable)res);
+        try {
+            for (; ; ) {
+                LockSupport.park();
 
-            return (R)res;
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+                if (isDone())
+                    return resolveAndThrow(state);
+                else if (Thread.interrupted()) {
+                    interrupted = true;
 
-            throw new IgniteInterruptedCheckedException(e);
+                    if (!ignoreInterrupts) {
+                        unregisterWaiter(Thread.currentThread());
+
+                        throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+                    }
+                }
+            }
+        }
+        finally {
+            restoreInterrupt(interrupted);
         }
     }
 
@@ -184,71 +176,156 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
      * @throws IgniteCheckedException If error occurred.
      */
     @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
-        if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
-            throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+        Object res = registerWaiter(Thread.currentThread());
+
+        if (res != State.INIT)
+            return resolveAndThrow(res);
+
+        long deadlineNanos = System.nanoTime() + nanosTimeout;
+
+        boolean interrupted = false;
+
+        try {
+            long nanosTimeout0 = nanosTimeout;
 
-        if (getState() == CANCELLED)
+            while (nanosTimeout0 > 0) {
+                LockSupport.parkNanos(nanosTimeout0);
+
+                nanosTimeout0 = deadlineNanos - System.nanoTime();
+
+                if (isDone())
+                    return resolveAndThrow(state);
+
+                else if (Thread.interrupted()) {
+                    interrupted = true;
+// TODO
+//                    if (!ignoreInterrupts)
+//                        throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+                }
+            }
+        }
+        finally {
+            restoreInterrupt(interrupted);
+
+            unregisterWaiter(Thread.currentThread());
+        }
+
+        throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+    }
+
+    protected R resolveAndThrow(Object val) throws IgniteCheckedException {
+        if (val == State.CANCELLED)
             throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
 
-        assert resFlag != 0;
+        if (val instanceof Throwable)
+            throw U.cast((Throwable)val);
+
+        return (R)val;
+    }
 
-        if (resFlag == ERR)
-            throw U.cast((Throwable)res);
 
-        return (R)res;
+    private void restoreInterrupt(boolean interrupted) {
+        if (interrupted)
+            Thread.currentThread().interrupt();
     }
 
-    /** {@inheritDoc} */
-    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) {
-        assert lsnr0 != null;
+    private Object registerWaiter(Object waiter) {
+        WaitNode waitNode = null;
 
-        boolean done = isDone();
+        for (; ; ) {
+            final Object oldState = state;
 
-        if (!done) {
-            synchronized (this) {
-                done = isDone(); // Double check.
+            if (isDone(oldState))
+                return oldState;
 
-                if (!done) {
-                    if (lsnr == null)
-                        lsnr = lsnr0;
-                    else if (lsnr instanceof ArrayListener)
-                        ((ArrayListener)lsnr).add(lsnr0);
-                    else
-                        lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0);
+            Object newState;
 
-                    return;
-                }
+            if (oldState == State.INIT)
+                newState = waiter;
+
+            else {
+                if (waitNode == null)
+                    waitNode = new WaitNode(waiter);
+
+                waitNode.next = oldState;
+
+                newState = waitNode;
             }
+
+            if (compareAndSetState(oldState, newState))
+                return State.INIT;
         }
+    }
 
-        assert done;
+    void unregisterWaiter(Thread waiter) {
+        WaitNode prev = null;
+        Object current = state;
 
-        notifyListener(lsnr0);
-    }
+        while (current != null) {
+            Object currentWaiter = current.getClass() == WaitNode.class ? ((WaitNode)current).waiter : current;
+            Object next = current.getClass() == WaitNode.class ? ((WaitNode)current).next : null;
 
-    /** {@inheritDoc} */
-    @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
-        return new ChainFuture<>(this, doneCb);
+            if (currentWaiter == waiter) {
+                // it is the item we are looking for, so lets try to remove it
+                if (prev == null) {
+                    // it's the first item of the stack, so we need to change the head to the next
+                    Object n = next == null ? State.INIT : next;
+                    // if we manage to CAS we are done, else we need to restart
+                    current = compareAndSetState(
+                        current,
+                        n) ? null : state;
+                }
+                else {
+                    // remove the current item (this is done by letting the prev.next point to the next instead of current)
+                    prev.next = next;
+                    // end the loop
+                    current = null;
+                }
+            }
+            else {
+                // it isn't the item we are looking for, so lets move on to the next
+                prev = current.getClass() == WaitNode.class ? (WaitNode)current : null;
+                current = next;
+            }
+        }
     }
 
-    /**
-     * Notifies all registered listeners.
-     */
-    private void notifyListeners() {
-        IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0;
 
-        synchronized (this) {
-            lsnr0 = lsnr;
+    private void unblockAll(Object waiter) {
+        while (waiter != null) {
+            if (waiter instanceof Thread) {
+                LockSupport.unpark((Thread)waiter);
+
+                return;
+            }
+            else if (waiter instanceof IgniteInClosure) {
+                notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter);
 
-            if (lsnr0 == null)
                 return;
+            }
+            else if (waiter.getClass() == WaitNode.class) {
+                WaitNode waitNode = (WaitNode) waiter;
 
-            lsnr = null;
+                unblockAll(waitNode.waiter);
+
+                waiter = waitNode.next;
+            }
+            else
+                return;
         }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> newLsnr) {
+        Object res = registerWaiter(newLsnr);
 
-        assert lsnr0 != null;
+        if (res != State.INIT)
+            notifyListener(newLsnr);
+    }
 
-        notifyListener(lsnr0);
+    /** {@inheritDoc} */
+    @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
+        return new ChainFuture<>(this, doneCb);
     }
 
     /**
@@ -285,9 +362,15 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
 
     /** {@inheritDoc} */
     @Override public boolean isDone() {
-        // Don't check for "valid" here, as "done" flag can be read
-        // even in invalid state.
-        return endTime != 0;
+        return isDone(state);
+    }
+
+    private boolean isDone(Object state) {
+        return state == null ||
+            !(state == State.INIT
+                || state.getClass() == WaitNode.class
+                || state instanceof Thread
+                || state instanceof IgniteInClosure);
     }
 
     /**
@@ -295,12 +378,12 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
      */
     public boolean isFailed() {
         // Must read endTime first.
-        return endTime != 0 && resFlag == ERR;
+        return state instanceof Throwable;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isCancelled() {
-        return getState() == CANCELLED;
+        return state == State.CANCELLED;
     }
 
     /**
@@ -354,34 +437,26 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
      * @return {@code True} if result was set by this call.
      */
     private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
-        boolean notify = false;
+        Object val = cancel ? State.CANCELLED : err != null ? err : res;
 
-        try {
-            if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
-                if (err != null) {
-                    resFlag = ERR;
-                    this.res = err;
-                }
-                else {
-                    resFlag = RES;
-                    this.res = res;
-                }
+        for (; ; ) {
+            final Object oldState = state;
 
-                notify = true;
+            if (isDone(oldState))
+                return false;
 
-                releaseShared(0);
+            if (compareAndSetState(oldState, val)) {
+                unblockAll(oldState);
 
                 return true;
             }
-
-            return false;
-        }
-        finally {
-            if (notify)
-                notifyListeners();
         }
     }
 
+    private boolean compareAndSetState(Object oldState, Object newState) {
+        return state == oldState && stateUpd.compareAndSet(this, oldState, newState);
+    }
+
     /**
      * Callback to notify that future is cancelled.
      *
@@ -391,26 +466,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
         return onDone(null, null, true);
     }
 
-    /** {@inheritDoc} */
-    @Override protected final int tryAcquireShared(int ignore) {
-        return endTime != 0 ? 1 : -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected final boolean tryReleaseShared(int ignore) {
-        endTime = U.currentTimeMillis();
-
-        // Always signal after setting final done status.
-        return true;
-    }
-
     /**
      * @return String representation of state.
      */
     private String state() {
-        int s = getState();
+        Object s = state;
 
-        return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
+        return s instanceof State ? s.toString() : isDone(s) ? "DONE" : "INIT";
     }
 
     /** {@inheritDoc} */
@@ -421,44 +483,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
     /**
      *
      */
-    private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
-
-        /**
-         * @param lsnrs Listeners.
-         */
-        private ArrayListener(IgniteInClosure... lsnrs) {
-            this.arr = lsnrs;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void apply(IgniteInternalFuture<R> fut) {
-            for (int i = 0; i < arr.length; i++)
-                arr[i].apply(fut);
-        }
-
-        /**
-         * @param lsnr Listener.
-         */
-        void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
-            arr = Arrays.copyOf(arr, arr.length + 1);
-
-            arr[arr.length - 1] = lsnr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ArrayListener.class, this, "arrSize", arr.length);
-        }
-    }
-
-    /**
-     *
-     */
     private static class ChainFuture<R, T> extends GridFutureAdapter<T> {
         /** */
         private static final long serialVersionUID = 0L;
@@ -495,4 +519,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
             return "ChainFuture [orig=" + fut + ", doneCb=" + doneCb + ']';
         }
     }
+
+    static final class WaitNode {
+        final Object waiter;
+        volatile Object next;
+
+        WaitNode(Object waiter) {
+            this.waiter = waiter;
+        }
+    }
 }