You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:39 UTC

[17/50] [abbrv] ignite git commit: fixing TC - fixed future adapter

fixing TC - fixed future adapter


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6a59fec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6a59fec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6a59fec

Branch: refs/heads/ignite-comm-balance-master
Commit: a6a59fec7548846675a8016ca17034de7585b8f5
Parents: 4bf1c46
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Nov 11 16:32:12 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Nov 11 16:32:12 2016 +0300

----------------------------------------------------------------------
 .../internal/util/future/GridFutureAdapter.java | 145 ++++++++++++-------
 1 file changed, 94 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6a59fec/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 2e5b1b8..1e449a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -34,20 +34,47 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Future adapter.
- * // TODO: remove serializable
+ * TODO:
+ * 1. remove serializable
+ * 2. remove start time, end time.
+ * 3. remove ignore interrupts flag.
  */
 public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
-    // https://bugs.openjdk.java.net/browse/JDK-8074773
+    /*
+     * https://bugs.openjdk.java.net/browse/JDK-8074773
+     */
     static {
         Class<?> ensureLoaded = LockSupport.class;
     }
 
+    /**
+     * Future state.
+     */
     private enum State {
         INIT,
 
         CANCELLED,
     }
 
+    /**
+     *
+     */
+    static final class WaitNode {
+        /** */
+        private final Object waiter;
+
+        /** */
+        private volatile Object next;
+
+        /**
+         * @param waiter Waiter.
+         */
+        WaitNode(Object waiter) {
+            this.waiter = waiter;
+        }
+    }
+
+    /** State updater. */
     private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd =
         AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
 
@@ -55,34 +82,41 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
     private static final long serialVersionUID = 0L;
 
     /** */
-//    private boolean ignoreInterrupts;
+    private boolean ignoreInterrupts;
 
+    /** */
     private volatile Object state = State.INIT;
 
-//    private long l1, l2, l3, l4, l5, l6, l7;
+    /** Future start time. */
+    private final long startTime = U.currentTimeMillis();
+
+    /** Future end time. */
+    private volatile long endTime;
 
     /** {@inheritDoc} */
     @Override public long startTime() {
-        return 0;
+        return startTime;
     }
 
     /** {@inheritDoc} */
     @Override public long duration() {
-        return 0;
+        long endTime = this.endTime;
+
+        return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
     }
 
     /**
      * @param ignoreInterrupts Ignore interrupts flag.
      */
     public void ignoreInterrupts(boolean ignoreInterrupts) {
-        // this.ignoreInterrupts = ignoreInterrupts;
+        this.ignoreInterrupts = ignoreInterrupts;
     }
 
     /**
      * @return Future end time.
      */
     public long endTime() {
-        return 0;
+        return endTime;
     }
 
     /** {@inheritDoc} */
@@ -101,7 +135,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
 
     /** {@inheritDoc} */
     @Override public R get() throws IgniteCheckedException {
-        return get0(false);
+        return get0(ignoreInterrupts);
     }
 
     /** {@inheritDoc} */
@@ -142,7 +176,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
 
         if (res != State.INIT) {
             // no registration was done since a value is available.
-            return resolveAndThrow(res);
+            return resolve(res);
         }
 
         boolean interrupted = false;
@@ -152,7 +186,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
                 LockSupport.park();
 
                 if (isDone())
-                    return resolveAndThrow(state);
+                    return resolve(state);
 
                 else if (Thread.interrupted()) {
                     interrupted = true;
@@ -166,7 +200,8 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
             }
         }
         finally {
-            restoreInterrupt(interrupted);
+            if (interrupted)
+                Thread.currentThread().interrupt();
         }
     }
 
@@ -181,7 +216,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         Object res = registerWaiter(Thread.currentThread());
 
         if (res != State.INIT)
-            return resolveAndThrow(res);
+            return resolve(res);
 
         long deadlineNanos = System.nanoTime() + nanosTimeout;
 
@@ -196,18 +231,19 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
                 nanosTimeout0 = deadlineNanos - System.nanoTime();
 
                 if (isDone())
-                    return resolveAndThrow(state);
+                    return resolve(state);
 
                 else if (Thread.interrupted()) {
                     interrupted = true;
 
-                    // TODO ignore interrupts support
-                    throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+                    if (!ignoreInterrupts)
+                        throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
                 }
             }
         }
         finally {
-            restoreInterrupt(interrupted);
+            if (interrupted)
+                Thread.currentThread().interrupt();
 
             unregisterWaiter(Thread.currentThread());
         }
@@ -215,7 +251,14 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
     }
 
-    protected R resolveAndThrow(Object val) throws IgniteCheckedException {
+    /**
+     * Resolves the value to result or exception.
+     *
+     * @param val Value to resolve.
+     * @return Result.
+     * @throws IgniteCheckedException If resolved to exception.
+     */
+    private R resolve(Object val) throws IgniteCheckedException {
         if (val == State.CANCELLED)
             throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
 
@@ -225,12 +268,10 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         return (R)val;
     }
 
-
-    private void restoreInterrupt(boolean interrupted) {
-        if (interrupted)
-            Thread.currentThread().interrupt();
-    }
-
+    /**
+     * @param waiter Waiter to register.
+     * @return Previous state.
+     */
     private Object registerWaiter(Object waiter) {
         WaitNode waitNode = null;
 
@@ -259,40 +300,40 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         }
     }
 
+    /**
+     * @param waiter Waiter to unregister.
+     */
     void unregisterWaiter(Thread waiter) {
         WaitNode prev = null;
-        Object current = state;
+        Object cur = state;
 
-        while (current != null) {
-            Object currentWaiter = current.getClass() == WaitNode.class ? ((WaitNode)current).waiter : current;
-            Object next = current.getClass() == WaitNode.class ? ((WaitNode)current).next : null;
+        while (cur != null) {
+            Object curWaiter = cur.getClass() == WaitNode.class ? ((WaitNode)cur).waiter : cur;
+            Object next = cur.getClass() == WaitNode.class ? ((WaitNode)cur).next : null;
 
-            if (currentWaiter == waiter) {
-                // it is the item we are looking for, so lets try to remove it
+            if (curWaiter == waiter) {
                 if (prev == null) {
-                    // it's the first item of the stack, so we need to change the head to the next
                     Object n = next == null ? State.INIT : next;
-                    // if we manage to CAS we are done, else we need to restart
-                    current = compareAndSetState(
-                        current,
-                        n) ? null : state;
+
+                    cur = compareAndSetState(cur, n) ? null : state;
                 }
                 else {
-                    // remove the current item (this is done by letting the prev.next point to the next instead of current)
                     prev.next = next;
-                    // end the loop
-                    current = null;
+
+                    cur = null;
                 }
             }
             else {
-                // it isn't the item we are looking for, so lets move on to the next
-                prev = current.getClass() == WaitNode.class ? (WaitNode)current : null;
-                current = next;
+                prev = cur.getClass() == WaitNode.class ? (WaitNode)cur : null;
+
+                cur = next;
             }
         }
     }
 
-
+    /**
+     * @param waiter Head of waiters queue to unblock.
+     */
     private void unblockAll(Object waiter) {
         while (waiter != null) {
             if (waiter instanceof Thread) {
@@ -367,6 +408,10 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         return isDone(state);
     }
 
+    /**
+     * @param state State to check.
+     * @return {@code True} if future is done.
+     */
     private boolean isDone(Object state) {
         return state == null ||
             !(state == State.INIT
@@ -448,6 +493,8 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
                 return false;
 
             if (compareAndSetState(oldState, val)) {
+                endTime = U.currentTimeMillis();
+
                 unblockAll(oldState);
 
                 return true;
@@ -455,6 +502,11 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
         }
     }
 
+    /**
+     * @param oldState Old state to check.
+     * @param newState New state to set.
+     * @return {@code True} if state has been changed (CASed).
+     */
     private boolean compareAndSetState(Object oldState, Object newState) {
         return state == oldState && stateUpd.compareAndSet(this, oldState, newState);
     }
@@ -521,13 +573,4 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
             return "ChainFuture [orig=" + fut + ", doneCb=" + doneCb + ']';
         }
     }
-
-    static final class WaitNode {
-        final Object waiter;
-        volatile Object next;
-
-        WaitNode(Object waiter) {
-            this.waiter = waiter;
-        }
-    }
 }