You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:47 UTC
[25/50] [abbrv] ignite git commit: reverted future adapter
reverted future adapter
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6adf8a92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6adf8a92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6adf8a92
Branch: refs/heads/ignite-comm-balance-master
Commit: 6adf8a92d2c3688ac72cf28b14f9263975d6135f
Parents: ccc3b47
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Mon Nov 14 18:00:18 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Nov 14 18:00:18 2016 +0300
----------------------------------------------------------------------
.../internal/util/future/GridFutureAdapter.java | 400 ++++++++-----------
1 file changed, 161 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6adf8a92/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 0076dd1..ea7a202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -17,14 +17,16 @@
package org.apache.ignite.internal.util.future;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -34,58 +36,32 @@ import org.jetbrains.annotations.Nullable;
/**
* Future adapter.
- * TODO:
- * 1. remove serializable
- * 2. remove start time, end time.
- * 3. remove ignore interrupts flag.
*/
-public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
- /*
- * https://bugs.openjdk.java.net/browse/JDK-8074773
- */
- static {
- Class<?> ensureLoaded = LockSupport.class;
- }
-
- /**
- * Future state.
- */
- private enum State {
- INIT,
-
- CANCELLED,
- }
-
- /**
- *
- */
- static final class WaitNode {
- /** */
- private final Object waiter;
+public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> {
+ /** */
+ private static final long serialVersionUID = 0L;
- /** */
- private volatile Object next;
+ /** Initial state. */
+ private static final int INIT = 0;
- /**
- * @param waiter Waiter.
- */
- WaitNode(Object waiter) {
- this.waiter = waiter;
- }
- }
+ /** Cancelled state. */
+ private static final int CANCELLED = 1;
- /** State updater. */
- private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd =
- AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
+ /** Done state. */
+ private static final int DONE = 2;
/** */
- private static final long serialVersionUID = 0L;
+ private static final byte ERR = 1;
/** */
- private boolean ignoreInterrupts;
+ private static final byte RES = 2;
/** */
- private volatile Object state = State.INIT;
+ private byte resFlag;
+
+ /** Result. */
+ @GridToStringInclude
+ private Object res;
/** Future start time. */
private final long startTime = U.currentTimeMillis();
@@ -93,6 +69,13 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
/** Future end time. */
private volatile long endTime;
+ /** */
+ private boolean ignoreInterrupts;
+
+ /** */
+ @GridToStringExclude
+ private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
+
/** {@inheritDoc} */
@Override public long startTime() {
return startTime;
@@ -121,16 +104,12 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
/** {@inheritDoc} */
@Override public Throwable error() {
- Object state0 = state;
-
- return (state0 instanceof Throwable) ? (Throwable)state0 : null;
+ return (resFlag == ERR) ? (Throwable)res : null;
}
/** {@inheritDoc} */
@Override public R result() {
- Object state0 = state;
-
- return isDone(state0) && !(state0 instanceof Throwable) ? (R)state0 : null;
+ return resFlag == RES ? (R)res : null;
}
/** {@inheritDoc} */
@@ -172,36 +151,28 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
* @throws IgniteCheckedException If failed.
*/
private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
- Object res = registerWaiter(Thread.currentThread());
-
- if (res != State.INIT) {
- // no registration was done since a value is available.
- return resolve(res);
- }
-
- boolean interrupted = false;
-
try {
- for (; ; ) {
- LockSupport.park();
+ if (endTime == 0) {
+ if (ignoreInterrupts)
+ acquireShared(0);
+ else
+ acquireSharedInterruptibly(0);
+ }
- if (isDone())
- return resolve(state);
+ if (getState() == CANCELLED)
+ throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
- else if (Thread.interrupted()) {
- interrupted = true;
+ assert resFlag != 0;
- if (!ignoreInterrupts) {
- unregisterWaiter(Thread.currentThread());
+ if (resFlag == ERR)
+ throw U.cast((Throwable)res);
- throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
- }
- }
- }
+ return (R)res;
}
- finally {
- if (interrupted)
- Thread.currentThread().interrupt();
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
}
}
@@ -213,162 +184,71 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
* @throws IgniteCheckedException If error occurred.
*/
@Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
- Object res = registerWaiter(Thread.currentThread());
+ if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
+ throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
- if (res != State.INIT)
- return resolve(res);
+ if (getState() == CANCELLED)
+ throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
- long deadlineNanos = System.nanoTime() + nanosTimeout;
+ assert resFlag != 0;
- boolean interrupted = false;
+ if (resFlag == ERR)
+ throw U.cast((Throwable)res);
- try {
- long nanosTimeout0 = nanosTimeout;
+ return (R)res;
+ }
- while (nanosTimeout0 > 0) {
- LockSupport.parkNanos(nanosTimeout0);
+ /** {@inheritDoc} */
+ @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) {
+ assert lsnr0 != null;
- nanosTimeout0 = deadlineNanos - System.nanoTime();
+ boolean done = isDone();
- if (isDone())
- return resolve(state);
+ if (!done) {
+ synchronized (this) {
+ done = isDone(); // Double check.
- else if (Thread.interrupted()) {
- interrupted = true;
+ if (!done) {
+ if (lsnr == null)
+ lsnr = lsnr0;
+ else if (lsnr instanceof ArrayListener)
+ ((ArrayListener)lsnr).add(lsnr0);
+ else
+ lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0);
- if (!ignoreInterrupts)
- throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+ return;
}
}
}
- finally {
- if (interrupted)
- Thread.currentThread().interrupt();
-
- unregisterWaiter(Thread.currentThread());
- }
-
- throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
- }
-
- /**
- * Resolves the value to result or exception.
- *
- * @param val Value to resolve.
- * @return Result.
- * @throws IgniteCheckedException If resolved to exception.
- */
- private R resolve(Object val) throws IgniteCheckedException {
- if (val == State.CANCELLED)
- throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
- if (val instanceof Throwable)
- throw U.cast((Throwable)val);
+ assert done;
- return (R)val;
+ notifyListener(lsnr0);
}
- /**
- * @param waiter Waiter to register.
- * @return Previous state.
- */
- private Object registerWaiter(Object waiter) {
- WaitNode waitNode = null;
-
- for (; ; ) {
- final Object oldState = state;
-
- if (isDone(oldState))
- return oldState;
-
- Object newState;
-
- if (oldState == State.INIT)
- newState = waiter;
-
- else {
- if (waitNode == null)
- waitNode = new WaitNode(waiter);
-
- waitNode.next = oldState;
-
- newState = waitNode;
- }
-
- if (compareAndSetState(oldState, newState))
- return State.INIT;
- }
- }
-
- /**
- * @param waiter Waiter to unregister.
- */
- void unregisterWaiter(Thread waiter) {
- WaitNode prev = null;
- Object cur = state;
-
- while (cur != null) {
- Object curWaiter = cur.getClass() == WaitNode.class ? ((WaitNode)cur).waiter : cur;
- Object next = cur.getClass() == WaitNode.class ? ((WaitNode)cur).next : null;
-
- if (curWaiter == waiter) {
- if (prev == null) {
- Object n = next == null ? State.INIT : next;
-
- cur = compareAndSetState(cur, n) ? null : state;
- }
- else {
- prev.next = next;
-
- cur = null;
- }
- }
- else {
- prev = cur.getClass() == WaitNode.class ? (WaitNode)cur : null;
-
- cur = next;
- }
- }
+ /** {@inheritDoc} */
+ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
+ return new ChainFuture<>(this, doneCb);
}
/**
- * @param waiter Head of waiters queue to unblock.
+ * Notifies all registered listeners.
*/
- private void unblockAll(Object waiter) {
- while (waiter != null) {
- if (waiter instanceof Thread) {
- LockSupport.unpark((Thread)waiter);
+ private void notifyListeners() {
+ IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0;
- return;
- }
- else if (waiter instanceof IgniteInClosure) {
- notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter);
+ synchronized (this) {
+ lsnr0 = lsnr;
+ if (lsnr0 == null)
return;
- }
- else if (waiter.getClass() == WaitNode.class) {
- WaitNode waitNode = (WaitNode) waiter;
- unblockAll(waitNode.waiter);
-
- waiter = waitNode.next;
- }
- else
- return;
+ lsnr = null;
}
- }
- /** {@inheritDoc} */
- @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> newLsnr) {
- Object res = registerWaiter(newLsnr);
+ assert lsnr0 != null;
- if (res != State.INIT)
- notifyListener(newLsnr);
- }
-
- /** {@inheritDoc} */
- @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
- return new ChainFuture<>(this, doneCb);
+ notifyListener(lsnr0);
}
/**
@@ -405,19 +285,9 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
/** {@inheritDoc} */
@Override public boolean isDone() {
- return isDone(state);
- }
-
- /**
- * @param state State to check.
- * @return {@code True} if future is done.
- */
- private boolean isDone(Object state) {
- return state == null ||
- !(state == State.INIT
- || state.getClass() == WaitNode.class
- || state instanceof Thread
- || state instanceof IgniteInClosure);
+ // Don't check for "valid" here, as "done" flag can be read
+ // even in invalid state.
+ return endTime != 0;
}
/**
@@ -425,12 +295,12 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
*/
public boolean isFailed() {
// Must read endTime first.
- return state instanceof Throwable;
+ return endTime != 0 && resFlag == ERR;
}
/** {@inheritDoc} */
@Override public boolean isCancelled() {
- return state == State.CANCELLED;
+ return getState() == CANCELLED;
}
/**
@@ -484,31 +354,32 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
* @return {@code True} if result was set by this call.
*/
private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
- Object val = cancel ? State.CANCELLED : err != null ? err : res;
-
- for (; ; ) {
- final Object oldState = state;
+ boolean notify = false;
- if (isDone(oldState))
- return false;
+ try {
+ if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
+ if (err != null) {
+ resFlag = ERR;
+ this.res = err;
+ }
+ else {
+ resFlag = RES;
+ this.res = res;
+ }
- if (compareAndSetState(oldState, val)) {
- endTime = U.currentTimeMillis();
+ notify = true;
- unblockAll(oldState);
+ releaseShared(0);
return true;
}
- }
- }
- /**
- * @param oldState Old state to check.
- * @param newState New state to set.
- * @return {@code True} if state has been changed (CASed).
- */
- private boolean compareAndSetState(Object oldState, Object newState) {
- return state == oldState && stateUpd.compareAndSet(this, oldState, newState);
+ return false;
+ }
+ finally {
+ if (notify)
+ notifyListeners();
+ }
}
/**
@@ -520,18 +391,69 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
return onDone(null, null, true);
}
+ /** {@inheritDoc} */
+ @Override protected final int tryAcquireShared(int ignore) {
+ return endTime != 0 ? 1 : -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected final boolean tryReleaseShared(int ignore) {
+ endTime = U.currentTimeMillis();
+
+ // Always signal after setting final done status.
+ return true;
+ }
+
/**
* @return String representation of state.
*/
private String state() {
- Object s = state;
+ int s = getState();
- return s instanceof State ? s.toString() : isDone(s) ? "DONE" : "INIT";
+ return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridFutureAdapter.class, this, "state", state(), "hash", System.identityHashCode(this));
+ return S.toString(GridFutureAdapter.class, this, "state", state());
+ }
+
+ /**
+ *
+ */
+ private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
+
+ /**
+ * @param lsnrs Listeners.
+ */
+ private ArrayListener(IgniteInClosure... lsnrs) {
+ this.arr = lsnrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteInternalFuture<R> fut) {
+ for (int i = 0; i < arr.length; i++)
+ arr[i].apply(fut);
+ }
+
+ /**
+ * @param lsnr Listener.
+ */
+ void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
+ arr = Arrays.copyOf(arr, arr.length + 1);
+
+ arr[arr.length - 1] = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ArrayListener.class, this, "arrSize", arr.length);
+ }
}
/**