You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:39 UTC
[17/50] [abbrv] ignite git commit: fixing TC - fixed future adapter
fixing TC - fixed future adapter
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6a59fec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6a59fec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6a59fec
Branch: refs/heads/ignite-comm-balance-master
Commit: a6a59fec7548846675a8016ca17034de7585b8f5
Parents: 4bf1c46
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Nov 11 16:32:12 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Nov 11 16:32:12 2016 +0300
----------------------------------------------------------------------
.../internal/util/future/GridFutureAdapter.java | 145 ++++++++++++-------
1 file changed, 94 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a6a59fec/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 2e5b1b8..1e449a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -34,20 +34,47 @@ import org.jetbrains.annotations.Nullable;
/**
* Future adapter.
- * // TODO: remove serializable
+ * TODO:
+ * 1. remove serializable
+ * 2. remove start time, end time.
+ * 3. remove ignore interrupts flag.
*/
public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
- // https://bugs.openjdk.java.net/browse/JDK-8074773
+ /*
+ * https://bugs.openjdk.java.net/browse/JDK-8074773
+ */
static {
Class<?> ensureLoaded = LockSupport.class;
}
+ /**
+ * Future state.
+ */
private enum State {
INIT,
CANCELLED,
}
+ /**
+ *
+ */
+ static final class WaitNode {
+ /** */
+ private final Object waiter;
+
+ /** */
+ private volatile Object next;
+
+ /**
+ * @param waiter Waiter.
+ */
+ WaitNode(Object waiter) {
+ this.waiter = waiter;
+ }
+ }
+
+ /** State updater. */
private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpd =
AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
@@ -55,34 +82,41 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
private static final long serialVersionUID = 0L;
/** */
-// private boolean ignoreInterrupts;
+ private boolean ignoreInterrupts;
+ /** */
private volatile Object state = State.INIT;
-// private long l1, l2, l3, l4, l5, l6, l7;
+ /** Future start time. */
+ private final long startTime = U.currentTimeMillis();
+
+ /** Future end time. */
+ private volatile long endTime;
/** {@inheritDoc} */
@Override public long startTime() {
- return 0;
+ return startTime;
}
/** {@inheritDoc} */
@Override public long duration() {
- return 0;
+ long endTime = this.endTime;
+
+ return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
}
/**
* @param ignoreInterrupts Ignore interrupts flag.
*/
public void ignoreInterrupts(boolean ignoreInterrupts) {
- // this.ignoreInterrupts = ignoreInterrupts;
+ this.ignoreInterrupts = ignoreInterrupts;
}
/**
* @return Future end time.
*/
public long endTime() {
- return 0;
+ return endTime;
}
/** {@inheritDoc} */
@@ -101,7 +135,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
/** {@inheritDoc} */
@Override public R get() throws IgniteCheckedException {
- return get0(false);
+ return get0(ignoreInterrupts);
}
/** {@inheritDoc} */
@@ -142,7 +176,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
if (res != State.INIT) {
// no registration was done since a value is available.
- return resolveAndThrow(res);
+ return resolve(res);
}
boolean interrupted = false;
@@ -152,7 +186,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
LockSupport.park();
if (isDone())
- return resolveAndThrow(state);
+ return resolve(state);
else if (Thread.interrupted()) {
interrupted = true;
@@ -166,7 +200,8 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
}
}
finally {
- restoreInterrupt(interrupted);
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
}
@@ -181,7 +216,7 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
Object res = registerWaiter(Thread.currentThread());
if (res != State.INIT)
- return resolveAndThrow(res);
+ return resolve(res);
long deadlineNanos = System.nanoTime() + nanosTimeout;
@@ -196,18 +231,19 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
nanosTimeout0 = deadlineNanos - System.nanoTime();
if (isDone())
- return resolveAndThrow(state);
+ return resolve(state);
else if (Thread.interrupted()) {
interrupted = true;
- // TODO ignore interrupts support
- throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
+ if (!ignoreInterrupts)
+ throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
}
}
}
finally {
- restoreInterrupt(interrupted);
+ if (interrupted)
+ Thread.currentThread().interrupt();
unregisterWaiter(Thread.currentThread());
}
@@ -215,7 +251,14 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
}
- protected R resolveAndThrow(Object val) throws IgniteCheckedException {
+ /**
+ * Resolves the value to result or exception.
+ *
+ * @param val Value to resolve.
+ * @return Result.
+ * @throws IgniteCheckedException If resolved to exception.
+ */
+ private R resolve(Object val) throws IgniteCheckedException {
if (val == State.CANCELLED)
throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
@@ -225,12 +268,10 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
return (R)val;
}
-
- private void restoreInterrupt(boolean interrupted) {
- if (interrupted)
- Thread.currentThread().interrupt();
- }
-
+ /**
+ * @param waiter Waiter to register.
+ * @return Previous state.
+ */
private Object registerWaiter(Object waiter) {
WaitNode waitNode = null;
@@ -259,40 +300,40 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
}
}
+ /**
+ * @param waiter Waiter to unregister.
+ */
void unregisterWaiter(Thread waiter) {
WaitNode prev = null;
- Object current = state;
+ Object cur = state;
- while (current != null) {
- Object currentWaiter = current.getClass() == WaitNode.class ? ((WaitNode)current).waiter : current;
- Object next = current.getClass() == WaitNode.class ? ((WaitNode)current).next : null;
+ while (cur != null) {
+ Object curWaiter = cur.getClass() == WaitNode.class ? ((WaitNode)cur).waiter : cur;
+ Object next = cur.getClass() == WaitNode.class ? ((WaitNode)cur).next : null;
- if (currentWaiter == waiter) {
- // it is the item we are looking for, so lets try to remove it
+ if (curWaiter == waiter) {
if (prev == null) {
- // it's the first item of the stack, so we need to change the head to the next
Object n = next == null ? State.INIT : next;
- // if we manage to CAS we are done, else we need to restart
- current = compareAndSetState(
- current,
- n) ? null : state;
+
+ cur = compareAndSetState(cur, n) ? null : state;
}
else {
- // remove the current item (this is done by letting the prev.next point to the next instead of current)
prev.next = next;
- // end the loop
- current = null;
+
+ cur = null;
}
}
else {
- // it isn't the item we are looking for, so lets move on to the next
- prev = current.getClass() == WaitNode.class ? (WaitNode)current : null;
- current = next;
+ prev = cur.getClass() == WaitNode.class ? (WaitNode)cur : null;
+
+ cur = next;
}
}
}
-
+ /**
+ * @param waiter Head of waiters queue to unblock.
+ */
private void unblockAll(Object waiter) {
while (waiter != null) {
if (waiter instanceof Thread) {
@@ -367,6 +408,10 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
return isDone(state);
}
+ /**
+ * @param state State to check.
+ * @return {@code True} if future is done.
+ */
private boolean isDone(Object state) {
return state == null ||
!(state == State.INIT
@@ -448,6 +493,8 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
return false;
if (compareAndSetState(oldState, val)) {
+ endTime = U.currentTimeMillis();
+
unblockAll(oldState);
return true;
@@ -455,6 +502,11 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
}
}
+ /**
+ * @param oldState Old state to check.
+ * @param newState New state to set.
+ * @return {@code True} if state has been changed (CASed).
+ */
private boolean compareAndSetState(Object oldState, Object newState) {
return state == oldState && stateUpd.compareAndSet(this, oldState, newState);
}
@@ -521,13 +573,4 @@ public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
return "ChainFuture [orig=" + fut + ", doneCb=" + doneCb + ']';
}
}
-
- static final class WaitNode {
- final Object waiter;
- volatile Object next;
-
- WaitNode(Object waiter) {
- this.waiter = waiter;
- }
- }
}