You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:47 UTC

[25/50] [abbrv] ignite git commit: reverted future adapter

reverted future adapter


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6adf8a92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6adf8a92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6adf8a92

Branch: refs/heads/ignite-comm-balance-master
Commit: 6adf8a92d2c3688ac72cf28b14f9263975d6135f
Parents: ccc3b47
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Nov 14 18:00:18 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Nov 14 18:00:18 2016 +0300

----------------------------------------------------------------------
 .../internal/util/future/GridFutureAdapter.java | 400 ++++++++-----------
 1 file changed, 161 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6adf8a92/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 0076dd1..ea7a202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -17,14 +17,16 @@
 
 package org.apache.ignite.internal.util.future;
 
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -34,58 +36,32 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Future adapter.
- * TODO:
- * 1. remove serializable
- * 2. remove start time, end time.
- * 3. remove ignore interrupts flag.
  */
-public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
-    /*
-     * https://bugs.openjdk.java.net/browse/JDK-8074773
-     */
-    static {
-        Class<?> ensureLoaded = LockSupport.class;
-    }
-
-    /**
-     * Future state.
-     */
-    private enum State {
-        INIT,
-
-        CANCELLED,
-    }
-
-    /**
-     *
-     */
-    static final class WaitNode {
-        /** */
-        private final Object waiter;
+public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> {
+    /** */
+    private static final long serialVersionUID = 0L;
 
-        /** */
-        private volatile Object next;
+    /** Initial state. */
+    private static final int INIT = 0;
 
-        /**
-         * @param waiter Waiter.
-         */
-        WaitNode(Object waiter) {
-            this.waiter = waiter;
-        }
-    }
+    /** Cancelled state. */
+    private static final int CANCELLED = 1;
 
-    /** State updater. */
-    private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd =
-        AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
+    /** Done state. */
+    private static final int DONE = 2;
 
     /** */
-    private static final long serialVersionUID = 0L;
+    private static final byte ERR = 1;
 
     /** */
-    private boolean ignoreInterrupts;
+    private static final byte RES = 2;
 
     /** */
-    private volatile Object state = State.INIT;
+    private byte resFlag;
+
+    /** Result. */
+    @GridToStringInclude
+    private Object res;
 
     /** Future start time. */
     private final long startTime = U.currentTimeMillis();
@@ -93,6 +69,13 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
     /** Future end time. */
     private volatile long endTime;
 
+    /** */
+    private boolean ignoreInterrupts;
+
+    /** */
+    @GridToStringExclude
+    private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
+
     /** {@inheritDoc} */
     @Override public long startTime() {
         return startTime;
@@ -121,16 +104,12 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
 
     /** {@inheritDoc} */
     @Override public Throwable error() {
-        Object state0 = state;
-
-        return (state0 instanceof Throwable) ? (Throwable)state0 : null;
+        return (resFlag == ERR) ? (Throwable)res : null;
     }
 
     /** {@inheritDoc} */
     @Override public R result() {
-        Object state0 = state;
-
-        return isDone(state0) && !(state0 instanceof Throwable) ? (R)state0 : null;
+        return resFlag == RES ? (R)res : null;
     }
 
     /** {@inheritDoc} */
@@ -172,36 +151,28 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
      * @throws IgniteCheckedException If failed.
      */
     private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
-        Object res = registerWaiter(Thread.currentThread());
-
-        if (res != State.INIT) {
-            // no registration was done since a value is available.
-            return resolve(res);
-        }
-
-        boolean interrupted = false;
-
         try {
-            for (; ; ) {
-                LockSupport.park();
+            if (endTime == 0) {
+                if (ignoreInterrupts)
+                    acquireShared(0);
+                else
+                    acquireSharedInterruptibly(0);
+            }
 
-                if (isDone())
-                    return resolve(state);
+            if (getState() == CANCELLED)
+                throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
 
-                else if (Thread.interrupted()) {
-                    interrupted = true;
+            assert resFlag != 0;
 
-                    if (!ignoreInterrupts) {
-                        unregisterWaiter(Thread.currentThread());
+            if (resFlag == ERR)
+                throw U.cast((Throwable)res);
 
-                        throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
-                    }
-                }
-            }
+            return (R)res;
         }
-        finally {
-            if (interrupted)
-                Thread.currentThread().interrupt();
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
         }
     }
 
@@ -213,162 +184,71 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
      * @throws IgniteCheckedException If error occurred.
      */
     @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
-        Object res = registerWaiter(Thread.currentThread());
+        if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
+            throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
 
-        if (res != State.INIT)
-            return resolve(res);
+        if (getState() == CANCELLED)
+            throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
 
-        long deadlineNanos = System.nanoTime() + nanosTimeout;
+        assert resFlag != 0;
 
-        boolean interrupted = false;
+        if (resFlag == ERR)
+            throw U.cast((Throwable)res);
 
-        try {
-            long nanosTimeout0 = nanosTimeout;
+        return (R)res;
+    }
 
-            while (nanosTimeout0 > 0) {
-                LockSupport.parkNanos(nanosTimeout0);
+    /** {@inheritDoc} */
+    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) {
+        assert lsnr0 != null;
 
-                nanosTimeout0 = deadlineNanos - System.nanoTime();
+        boolean done = isDone();
 
-                if (isDone())
-                    return resolve(state);
+        if (!done) {
+            synchronized (this) {
+                done = isDone(); // Double check.
 
-                else if (Thread.interrupted()) {
-                    interrupted = true;
+                if (!done) {
+                    if (lsnr == null)
+                        lsnr = lsnr0;
+                    else if (lsnr instanceof ArrayListener)
+                        ((ArrayListener)lsnr).add(lsnr0);
+                    else
+                        lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0);
 
-                    if (!ignoreInterrupts)
-                        throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+                    return;
                 }
             }
         }
-        finally {
-            if (interrupted)
-                Thread.currentThread().interrupt();
-
-            unregisterWaiter(Thread.currentThread());
-        }
-
-        throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
-    }
-
-    /**
-     * Resolves the value to result or exception.
-     *
-     * @param val Value to resolve.
-     * @return Result.
-     * @throws IgniteCheckedException If resolved to exception.
-     */
-    private R resolve(Object val) throws IgniteCheckedException {
-        if (val == State.CANCELLED)
-            throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
 
-        if (val instanceof Throwable)
-            throw U.cast((Throwable)val);
+        assert done;
 
-        return (R)val;
+        notifyListener(lsnr0);
     }
 
-    /**
-     * @param waiter Waiter to register.
-     * @return Previous state.
-     */
-    private Object registerWaiter(Object waiter) {
-        WaitNode waitNode = null;
-
-        for (; ; ) {
-            final Object oldState = state;
-
-            if (isDone(oldState))
-                return oldState;
-
-            Object newState;
-
-            if (oldState == State.INIT)
-                newState = waiter;
-
-            else {
-                if (waitNode == null)
-                    waitNode = new WaitNode(waiter);
-
-                waitNode.next = oldState;
-
-                newState = waitNode;
-            }
-
-            if (compareAndSetState(oldState, newState))
-                return State.INIT;
-        }
-    }
-
-    /**
-     * @param waiter Waiter to unregister.
-     */
-    void unregisterWaiter(Thread waiter) {
-        WaitNode prev = null;
-        Object cur = state;
-
-        while (cur != null) {
-            Object curWaiter = cur.getClass() == WaitNode.class ? ((WaitNode)cur).waiter : cur;
-            Object next = cur.getClass() == WaitNode.class ? ((WaitNode)cur).next : null;
-
-            if (curWaiter == waiter) {
-                if (prev == null) {
-                    Object n = next == null ? State.INIT : next;
-
-                    cur = compareAndSetState(cur, n) ? null : state;
-                }
-                else {
-                    prev.next = next;
-
-                    cur = null;
-                }
-            }
-            else {
-                prev = cur.getClass() == WaitNode.class ? (WaitNode)cur : null;
-
-                cur = next;
-            }
-        }
+    /** {@inheritDoc} */
+    @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
+        return new ChainFuture<>(this, doneCb);
     }
 
     /**
-     * @param waiter Head of waiters queue to unblock.
+     * Notifies all registered listeners.
      */
-    private void unblockAll(Object waiter) {
-        while (waiter != null) {
-            if (waiter instanceof Thread) {
-                LockSupport.unpark((Thread)waiter);
+    private void notifyListeners() {
+        IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0;
 
-                return;
-            }
-            else if (waiter instanceof IgniteInClosure) {
-                notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter);
+        synchronized (this) {
+            lsnr0 = lsnr;
 
+            if (lsnr0 == null)
                 return;
-            }
-            else if (waiter.getClass() == WaitNode.class) {
-                WaitNode waitNode = (WaitNode) waiter;
 
-                unblockAll(waitNode.waiter);
-
-                waiter = waitNode.next;
-            }
-            else
-                return;
+            lsnr = null;
         }
-    }
 
-    /** {@inheritDoc} */
-    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> newLsnr) {
-        Object res = registerWaiter(newLsnr);
+        assert lsnr0 != null;
 
-        if (res != State.INIT)
-            notifyListener(newLsnr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
-        return new ChainFuture<>(this, doneCb);
+        notifyListener(lsnr0);
     }
 
     /**
@@ -405,19 +285,9 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
 
     /** {@inheritDoc} */
     @Override public boolean isDone() {
-        return isDone(state);
-    }
-
-    /**
-     * @param state State to check.
-     * @return {@code True} if future is done.
-     */
-    private boolean isDone(Object state) {
-        return state == null ||
-            !(state == State.INIT
-                || state.getClass() == WaitNode.class
-                || state instanceof Thread
-                || state instanceof IgniteInClosure);
+        // Don't check for "valid" here, as "done" flag can be read
+        // even in invalid state.
+        return endTime != 0;
     }
 
     /**
@@ -425,12 +295,12 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
      */
     public boolean isFailed() {
         // Must read endTime first.
-        return state instanceof Throwable;
+        return endTime != 0 && resFlag == ERR;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isCancelled() {
-        return state == State.CANCELLED;
+        return getState() == CANCELLED;
     }
 
     /**
@@ -484,31 +354,32 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
      * @return {@code True} if result was set by this call.
      */
     private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
-        Object val = cancel ? State.CANCELLED : err != null ? err : res;
-
-        for (; ; ) {
-            final Object oldState = state;
+        boolean notify = false;
 
-            if (isDone(oldState))
-                return false;
+        try {
+            if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
+                if (err != null) {
+                    resFlag = ERR;
+                    this.res = err;
+                }
+                else {
+                    resFlag = RES;
+                    this.res = res;
+                }
 
-            if (compareAndSetState(oldState, val)) {
-                endTime = U.currentTimeMillis();
+                notify = true;
 
-                unblockAll(oldState);
+                releaseShared(0);
 
                 return true;
             }
-        }
-    }
 
-    /**
-     * @param oldState Old state to check.
-     * @param newState New state to set.
-     * @return {@code True} if state has been changed (CASed).
-     */
-    private boolean compareAndSetState(Object oldState, Object newState) {
-        return state == oldState && stateUpd.compareAndSet(this, oldState, newState);
+            return false;
+        }
+        finally {
+            if (notify)
+                notifyListeners();
+        }
     }
 
     /**
@@ -520,18 +391,69 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         return onDone(null, null, true);
     }
 
+    /** {@inheritDoc} */
+    @Override protected final int tryAcquireShared(int ignore) {
+        return endTime != 0 ? 1 : -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected final boolean tryReleaseShared(int ignore) {
+        endTime = U.currentTimeMillis();
+
+        // Always signal after setting final done status.
+        return true;
+    }
+
     /**
      * @return String representation of state.
      */
     private String state() {
-        Object s = state;
+        int s = getState();
 
-        return s instanceof State ? s.toString() : isDone(s) ? "DONE" : "INIT";
+        return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridFutureAdapter.class, this, "state", state(), "hash", System.identityHashCode(this));
+        return S.toString(GridFutureAdapter.class, this, "state", state());
+    }
+
+    /**
+     *
+     */
+    private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
+
+        /**
+         * @param lsnrs Listeners.
+         */
+        private ArrayListener(IgniteInClosure... lsnrs) {
+            this.arr = lsnrs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(IgniteInternalFuture<R> fut) {
+            for (int i = 0; i < arr.length; i++)
+                arr[i].apply(fut);
+        }
+
+        /**
+         * @param lsnr Listener.
+         */
+        void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
+            arr = Arrays.copyOf(arr, arr.length + 1);
+
+            arr[arr.length - 1] = lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ArrayListener.class, this, "arrSize", arr.length);
+        }
     }
 
     /**