You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/04/13 12:42:56 UTC
[1/2] ignite git commit: ignite-4681 Apply new future adapter
Repository: ignite
Updated Branches:
refs/heads/master dd4a5c423 -> e922dda6e
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 4b8915c..7ad7c31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -85,9 +85,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
private Iterator<R> iter;
/** */
- protected final Object mux = new Object();
-
- /** */
private IgniteUuid timeoutId = IgniteUuid.randomUuid();
/** */
@@ -215,7 +212,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
Collection<R> res = null;
while (res == null) {
- synchronized (mux) {
+ synchronized (this) {
res = queue.poll();
}
@@ -228,10 +225,10 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
if (waitTime <= 0)
break;
- synchronized (mux) {
+ synchronized (this) {
try {
if (queue.isEmpty() && !isDone())
- mux.wait(waitTime);
+ wait(waitTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -273,7 +270,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
while (it == null || !it.hasNext()) {
Collection<R> c;
- synchronized (mux) {
+ synchronized (this) {
it = iter;
if (it != null && it.hasNext())
@@ -301,10 +298,10 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
break;
}
- synchronized (mux) {
+ synchronized (this) {
try {
if (queue.isEmpty() && !isDone())
- mux.wait(waitTime);
+ wait(waitTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -332,7 +329,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
*/
@SuppressWarnings({"unchecked"})
protected void enqueue(Collection<?> col) {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
queue.add((Collection<R>)col);
@@ -351,7 +348,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
Collection<Object> dedupCol = new ArrayList<>(col.size());
- synchronized (mux) {
+ synchronized (this) {
for (Object o : col)
if (!(o instanceof Map.Entry) || keys.add(((Map.Entry<K, V>)o).getKey()))
dedupCol.add(o);
@@ -377,7 +374,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
try {
if (err != null)
- synchronized (mux) {
+ synchronized (this) {
enqueue(Collections.emptyList());
onDone(nodeId != null ?
@@ -387,7 +384,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
onPage(nodeId, true);
- mux.notifyAll();
+ notifyAll();
}
else {
if (data == null)
@@ -397,7 +394,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary());
- synchronized (mux) {
+ synchronized (this) {
enqueue(data);
if (qry.query().keepAll())
@@ -409,7 +406,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
clear();
}
- mux.notifyAll();
+ notifyAll();
}
}
}
@@ -426,14 +423,14 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
* @param e Error.
*/
private void onPageError(@Nullable UUID nodeId, Throwable e) {
- synchronized (mux) {
+ synchronized (this) {
enqueue(Collections.emptyList());
onPage(nodeId, true);
onDone(e);
- mux.notifyAll();
+ notifyAll();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
index 67d00ea..1cf7809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
@@ -214,9 +214,6 @@ public class TxDeadlockDetection {
/** Timed out flag. */
private volatile boolean timedOut;
- /** Mutex. */
- private final Object mux = new Object();
-
/**
* @param cctx Context.
* @param txId Tx ID.
@@ -521,7 +518,7 @@ public class TxDeadlockDetection {
* @param val Value.
*/
private boolean compareAndSet(UUID exp, UUID val) {
- synchronized (mux) {
+ synchronized (this) {
if (Objects.equals(curNodeId, exp)) {
curNodeId = val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 2b2a78a..6012625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -383,16 +383,6 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override public long startTime() {
- return fut.startTime();
- }
-
- /** {@inheritDoc} */
- @Override public long duration() {
- return fut.duration();
- }
-
- /** {@inheritDoc} */
@Override public void listen(final IgniteInClosure lsnr) {
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
index 71eca65..c1334e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
@@ -31,11 +31,11 @@ import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.lang.IgniteClosure2X;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.C2;
-import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -178,11 +178,11 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
return null;
}
- IgniteInternalFuture<GridRestResponse> f = hnd.handleAsync(createRestRequest(req, cmd.get1()));
-
- f.listen(new CIX1<IgniteInternalFuture<GridRestResponse>>() {
- @Override public void applyx(IgniteInternalFuture<GridRestResponse> f) throws IgniteCheckedException {
- GridRestResponse restRes = f.get();
+ return new GridEmbeddedFuture<>(new IgniteClosure2X<GridRestResponse, Exception, GridRestResponse>() {
+ @Override public GridRestResponse applyx(GridRestResponse restRes,
+ Exception ex) throws IgniteCheckedException {
+ if(ex != null)
+ throw U.cast(ex);
// Handle 'Stat' command (special case because several packets are included in response).
if (cmd.get1() == CACHE_METRICS) {
@@ -237,7 +237,7 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
else
res.status(FAILURE);
- if (cmd.get3())
+ if (cmd.get3() == Boolean.TRUE)
res.key(req.key());
if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS && res.addData() &&
@@ -246,10 +246,10 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
sendResponse(ses, res);
}
- }
- });
- return f;
+ return restRes;
+ }
+ }, hnd.handleAsync(createRestRequest(req, cmd.get1())));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 96f3797..3e08cd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -57,9 +57,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
private static final AtomicIntegerFieldUpdater<GridCompoundFuture> LSNR_CALLS_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
- /** Sync object */
- protected final Object sync = new Object();
-
/** Possible values: null (no future), IgniteInternalFuture instance (single future) or List of futures */
private volatile Object futs;
@@ -146,7 +143,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
throw e;
}
- LSNR_CALLS_UPD.incrementAndGet(GridCompoundFuture.this);
+ LSNR_CALLS_UPD.incrementAndGet(this);
checkComplete();
}
@@ -169,16 +166,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
* @return Collection of futures.
*/
@SuppressWarnings("unchecked")
- public final Collection<IgniteInternalFuture<T>> futures() {
- synchronized (sync) {
- if (futs == null)
- return Collections.emptyList();
+ public synchronized final Collection<IgniteInternalFuture<T>> futures() {
+ if (futs == null)
+ return Collections.emptyList();
- if (futs instanceof IgniteInternalFuture)
- return Collections.singletonList((IgniteInternalFuture<T>)futs);
+ if (futs instanceof IgniteInternalFuture)
+ return Collections.singletonList((IgniteInternalFuture<T>)futs);
- return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
- }
+ return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
}
/**
@@ -200,7 +195,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
protected final boolean hasPending() {
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation and collection copy.
@@ -224,7 +219,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
public final void add(IgniteInternalFuture<T> fut) {
assert fut != null;
- synchronized (sync) {
+ synchronized (this) {
if (futs == null)
futs = fut;
else if (futs instanceof IgniteInternalFuture) {
@@ -254,10 +249,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
/**
* Clear futures.
*/
- protected final void clear() {
- synchronized (sync) {
- futs = null;
- }
+ protected synchronized final void clear() {
+ futs = null;
}
/**
@@ -307,7 +300,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
*/
@SuppressWarnings("unchecked")
protected final IgniteInternalFuture<T> future(int idx) {
- assert Thread.holdsLock(sync);
+ assert Thread.holdsLock(this);
assert futs != null && idx >= 0 && idx < futuresCountNoLock();
if (futs instanceof IgniteInternalFuture) {
@@ -324,7 +317,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
*/
@SuppressWarnings("unchecked")
protected final int futuresCountNoLock() {
- assert Thread.holdsLock(sync);
+ assert Thread.holdsLock(this);
if (futs == null)
return 0;
@@ -338,19 +331,15 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
/**
* @return Futures size.
*/
- private int futuresCount() {
- synchronized (sync) {
- return futuresCountNoLock();
- }
+ private synchronized int futuresCount() {
+ return futuresCountNoLock();
}
/**
* @return {@code True} if has at least one future.
*/
- protected final boolean hasFutures() {
- synchronized (sync) {
- return futs != null;
- }
+ protected synchronized final boolean hasFutures() {
+ return futs != null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index dc63adc..b149035 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -43,9 +43,6 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
/** Complete value. */
private final Object res;
- /** Start time. */
- private final long startTime = U.currentTimeMillis();
-
/**
* Creates finished future with complete value.
*/
@@ -84,16 +81,6 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
}
/** {@inheritDoc} */
- @Override public long startTime() {
- return startTime;
- }
-
- /** {@inheritDoc} */
- @Override public long duration() {
- return 0;
- }
-
- /** {@inheritDoc} */
@Override public boolean cancel() {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 723dff7..323babd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.util.future;
-import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,79 +38,100 @@ import org.jetbrains.annotations.Nullable;
/**
* Future adapter.
*/
-public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> {
- /** */
- private static final long serialVersionUID = 0L;
+public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
+ /** Done state representation. */
+ private static final String DONE = "DONE";
/** Initial state. */
- private static final int INIT = 0;
+ private static final Node INIT = new Node(null);
/** Cancelled state. */
- private static final int CANCELLED = 1;
-
- /** Done state. */
- private static final int DONE = 2;
-
- /** */
- private static final byte ERR = 1;
+ private static final Object CANCELLED = new Object();
/** */
- private static final byte RES = 2;
+ private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
- /** */
- private byte resFlag;
+ /*
+ * https://bugs.openjdk.java.net/browse/JDK-8074773
+ */
+ static {
+ @SuppressWarnings("unused")
+ Class<?> ensureLoaded = LockSupport.class;
+ }
- /** Result. */
- @GridToStringInclude(sensitive = true)
- private Object res;
+ /**
+ * Stack node.
+ */
+ private static final class Node {
+ /** */
+ private final Object val;
- /** Future start time. */
- private final long startTime = U.currentTimeMillis();
+ /** */
+ private volatile Node next;
- /** Future end time. */
- private volatile long endTime;
+ /**
+ * @param val Node value.
+ */
+ Node(Object val) {
+ this.val = val;
+ }
+ }
/** */
- private boolean ignoreInterrupts;
+ private static final class ErrorWrapper {
+ /** */
+ private final Throwable error;
- /** */
- @GridToStringExclude
- private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
+ /**
+ * @param error Error.
+ */
+ ErrorWrapper(Throwable error) {
+ this.error = error;
+ }
- /** {@inheritDoc} */
- @Override public long startTime() {
- return startTime;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return String.valueOf(error);
+ }
}
- /** {@inheritDoc} */
- @Override public long duration() {
- long endTime = this.endTime;
-
- return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
- }
+ /** */
+ private boolean ignoreInterrupts;
- /**
- * @param ignoreInterrupts Ignore interrupts flag.
- */
- public void ignoreInterrupts(boolean ignoreInterrupts) {
- this.ignoreInterrupts = ignoreInterrupts;
- }
+ /** */
+ @GridToStringExclude
+ private volatile Object state = INIT;
/**
- * @return Future end time.
+ * Determines whether the future will ignore interrupts.
*/
- public long endTime() {
- return endTime;
+ public void ignoreInterrupts() {
+ ignoreInterrupts = true;
}
/** {@inheritDoc} */
@Override public Throwable error() {
- return (resFlag == ERR) ? (Throwable)res : null;
+ Object state0 = state;
+
+ if (state0 != null && state0.getClass() == ErrorWrapper.class)
+ return ((ErrorWrapper)state0).error;
+
+ return null;
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public R result() {
- return resFlag == RES ? (R)res : null;
+ Object state0 = state;
+
+ if(state0 == null || // It is DONE state
+ (state0.getClass() != Node.class && // It is not INIT state
+ state0.getClass() != ErrorWrapper.class && // It is not FAILED
+ state0 != CANCELLED)) // It is not CANCELLED
+ return (R)state0;
+
+ return null;
}
/** {@inheritDoc} */
@@ -135,14 +155,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
A.notNull(unit, "unit");
- try {
- return get0(unit.toNanos(timeout));
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
- }
+ return get0(unit.toNanos(timeout));
}
/**
@@ -153,79 +166,190 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* @throws IgniteCheckedException If failed.
*/
private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
+ if (isDone() || !registerWaiter(Thread.currentThread()))
+ return resolve();
+
+ boolean interrupted = false;
+
try {
- if (endTime == 0) {
- if (ignoreInterrupts)
- acquireShared(0);
- else
- acquireSharedInterruptibly(0);
- }
+ while (true) {
+ LockSupport.park();
- if (getState() == CANCELLED)
- throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
+ if (Thread.interrupted()) {
+ interrupted = true;
- assert resFlag != 0;
+ if (!ignoreInterrupts) {
+ unregisterWaiter(Thread.currentThread());
- if (resFlag == ERR)
- throw U.cast((Throwable)res);
+ throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.");
+ }
+ }
- return (R)res;
+ if (isDone())
+ return resolve();
+ }
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
+ finally {
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
}
/**
* @param nanosTimeout Timeout (nanoseconds).
* @return Result.
- * @throws InterruptedException If interrupted.
* @throws IgniteFutureTimeoutCheckedException If timeout reached before computation completed.
* @throws IgniteCheckedException If error occurred.
*/
- @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
- if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
- throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+ @Nullable private R get0(long nanosTimeout) throws IgniteCheckedException {
+ if (isDone() || !registerWaiter(Thread.currentThread()))
+ return resolve();
+
+ long deadlineNanos = System.nanoTime() + nanosTimeout;
+
+ boolean interrupted = false;
+
+ try {
+ long nanosTimeout0 = nanosTimeout;
+
+ while (nanosTimeout0 > 0) {
+ LockSupport.parkNanos(nanosTimeout0);
+
+ nanosTimeout0 = deadlineNanos - System.nanoTime();
+
+ if (Thread.interrupted()) {
+ interrupted = true;
+
+ if (!ignoreInterrupts) {
+ unregisterWaiter(Thread.currentThread());
+
+ throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.");
+ }
+ }
+
+ if (isDone())
+ return resolve();
+ }
+ }
+ finally {
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+
+ unregisterWaiter(Thread.currentThread());
- if (getState() == CANCELLED)
+ throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+ }
+
+ /**
+ * Resolves the value to result or exception.
+ *
+ * @return Result.
+ * @throws IgniteCheckedException If resolved to exception.
+ */
+ @SuppressWarnings("unchecked")
+ private R resolve() throws IgniteCheckedException {
+ if(state == CANCELLED)
throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
- assert resFlag != 0;
+ if(state == null || state.getClass() != ErrorWrapper.class)
+ return (R)state;
+
+ throw U.cast(((ErrorWrapper)state).error);
+ }
+
+ /**
+ * @param waiter Waiter to register.
+ * @return {@code True} if was registered successfully.
+ */
+ private boolean registerWaiter(Object waiter) {
+ Node node = null;
+
+ while (true) {
+ final Object oldState = state;
+
+ if (isDone(oldState))
+ return false;
+
+ if(node == null)
+ node = new Node(waiter);
- if (resFlag == ERR)
- throw U.cast((Throwable)res);
+ if(oldState != INIT && oldState.getClass() == Node.class)
+ node.next = (Node)oldState;
- return (R)res;
+ if (compareAndSetState(oldState, node))
+ return true;
+ }
}
- /** {@inheritDoc} */
- @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) {
- assert lsnr0 != null;
+ /**
+ * @param waiter Waiter to unregister.
+ */
+ private void unregisterWaiter(Thread waiter) {
+ Node prev = null;
+ Object cur = state;
- boolean done = isDone();
+ while (cur != null) {
+ if(cur.getClass() != Node.class)
+ return;
+
+ Object curWaiter = ((Node)cur).val;
+ Node next = ((Node)cur).next;
- if (!done) {
- synchronized (this) {
- done = isDone(); // Double check.
+ if (curWaiter == waiter) {
+ if (prev == null) {
+ Object n = next == null ? INIT : next;
- if (!done) {
- if (lsnr == null)
- lsnr = lsnr0;
- else if (lsnr instanceof ArrayListener)
- ((ArrayListener)lsnr).add(lsnr0);
- else
- lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0);
+ cur = compareAndSetState(cur, n) ? null : state;
+ }
+ else {
+ prev.next = next;
- return;
+ cur = null;
}
}
+ else {
+ prev = (Node)cur;
+
+ cur = next;
+ }
+ }
+ }
+
+ /**
+ * @param exp Expected state.
+ * @param newState New state.
+ * @return {@code True} if success
+ */
+ private boolean compareAndSetState(Object exp, Object newState) {
+ return stateUpdater.compareAndSet(this, exp, newState);
+ }
+
+ /**
+ * @param head Head of waiters stack.
+ */
+ @SuppressWarnings("unchecked")
+ private void unblockAll(Node head) {
+ while (head != null) {
+ unblock(head.val);
+ head = head.next;
}
+ }
- assert done;
+ /**
+ * @param waiter Waiter to unblock
+ */
+ private void unblock(Object waiter) {
+ if(waiter instanceof Thread)
+ LockSupport.unpark((Thread)waiter);
+ else
+ notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter);
+ }
- notifyListener(lsnr0);
+ /** {@inheritDoc} */
+ @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
+ if (!registerWaiter(lsnr))
+ notifyListener(lsnr);
}
/** {@inheritDoc} */
@@ -240,27 +364,14 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
}
/**
- * Notifies all registered listeners.
+ * @return Logger instance.
*/
- private void notifyListeners() {
- IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0;
-
- synchronized (this) {
- lsnr0 = lsnr;
-
- if (lsnr0 == null)
- return;
-
- lsnr = null;
- }
-
- assert lsnr0 != null;
-
- notifyListener(lsnr0);
+ @Nullable public IgniteLogger logger() {
+ return null;
}
/**
- * Notifies single listener.
+ * Notifies listener.
*
* @param lsnr Listener.
*/
@@ -293,22 +404,29 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** {@inheritDoc} */
@Override public boolean isDone() {
- // Don't check for "valid" here, as "done" flag can be read
- // even in invalid state.
- return endTime != 0;
+ return isDone(state);
+ }
+
+ /**
+ * @param state State to check.
+ * @return {@code True} if future is done.
+ */
+ private boolean isDone(Object state) {
+ return state == null || state.getClass() != Node.class;
}
/**
- * @return Checks is future is completed with exception.
+ * @return {@code True} if future is completed with exception.
*/
public boolean isFailed() {
- // Must read endTime first.
- return endTime != 0 && resFlag == ERR;
+ Object state0 = state;
+
+ return state0 != null && state0.getClass() == ErrorWrapper.class;
}
/** {@inheritDoc} */
@Override public boolean isCancelled() {
- return getState() == CANCELLED;
+ return state == CANCELLED;
}
/**
@@ -361,32 +479,22 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* @param cancel {@code True} if future is being cancelled.
* @return {@code True} if result was set by this call.
*/
- private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
- boolean notify = false;
+ protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+ Object newState = cancel ? CANCELLED : err != null ? new ErrorWrapper(err) : res;
- try {
- if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
- if (err != null) {
- resFlag = ERR;
- this.res = err;
- }
- else {
- resFlag = RES;
- this.res = res;
- }
+ while (true) {
+ final Object oldState = state;
- notify = true;
+ if (isDone(oldState))
+ return false;
- releaseShared(0);
+ if (compareAndSetState(oldState, newState)) {
+
+ if(oldState != INIT)
+ unblockAll((Node)oldState);
return true;
}
-
- return false;
- }
- finally {
- if (notify)
- notifyListeners();
}
}
@@ -400,75 +508,26 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
}
/** {@inheritDoc} */
- @Override protected final int tryAcquireShared(int ignore) {
- return endTime != 0 ? 1 : -1;
- }
-
- /** {@inheritDoc} */
- @Override protected final boolean tryReleaseShared(int ignore) {
- endTime = U.currentTimeMillis();
-
- // Always signal after setting final done status.
- return true;
- }
-
- /**
- * @return String representation of state.
- */
- private String state() {
- int s = getState();
-
- return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
- }
+ @SuppressWarnings("StringEquality")
+ @Override public String toString() {
+ Object state0 = state;
- /**
- * @return Logger instance.
- */
- @Nullable public IgniteLogger logger() {
- return null;
- }
+ String stateStr = stateStr(state0);
+ String resStr = stateStr == DONE ? String.valueOf(state0) : null;
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridFutureAdapter.class, this, "state", state());
+ return S.toString(
+ GridFutureAdapter.class, this,
+ "state", stateStr, false,
+ "res", resStr, true,
+ "hash", System.identityHashCode(this), false);
}
/**
- *
+ * @param s State.
+ * @return State string representation.
*/
- private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
-
- /**
- * @param lsnrs Listeners.
- */
- private ArrayListener(IgniteInClosure... lsnrs) {
- this.arr = lsnrs;
- }
-
- /** {@inheritDoc} */
- @Override public void apply(IgniteInternalFuture<R> fut) {
- for (int i = 0; i < arr.length; i++)
- arr[i].apply(fut);
- }
-
- /**
- * @param lsnr Listener.
- */
- void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
- arr = Arrays.copyOf(arr, arr.length + 1);
-
- arr[arr.length - 1] = lsnr;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ArrayListener.class, this, "arrSize", arr.length);
- }
+ private String stateStr(Object s) {
+ return s == CANCELLED ? "CANCELLED" : s != null && s.getClass() == Node.class ? "INIT" : DONE;
}
/**
@@ -476,22 +535,12 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
*/
private static class ChainFuture<R, T> extends GridFutureAdapter<T> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private GridFutureAdapter<R> fut;
/** */
private IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb;
/**
- *
- */
- public ChainFuture() {
- // No-op.
- }
-
- /**
* @param fut Future.
* @param doneCb Closure.
* @param cbExec Optional executor to run callback.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
index 7d74154..08fae96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
@@ -53,16 +53,6 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
}
/** {@inheritDoc} */
- @Override public long startTime() {
- return fut.startTime();
- }
-
- /** {@inheritDoc} */
- @Override public long duration() {
- return fut.duration();
- }
-
- /** {@inheritDoc} */
@Override public boolean isCancelled() {
return fut.isCancelled();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
index df1ab88..6519ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
@@ -95,21 +95,6 @@ public interface IgniteFuture<V> {
public boolean isDone();
/**
- * Gets start time for this future.
- *
- * @return Start time for this future.
- */
- public long startTime();
-
- /**
- * Gets duration in milliseconds between start of the future and current time if future
- * is not finished, or between start and finish of this future.
- *
- * @return Time in milliseconds this future has taken to execute.
- */
- public long duration();
-
- /**
* Registers listener closure to be asynchronously notified whenever future completes.
*
* @param lsnr Listener closure to register. If not provided - this method is no-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
index dd710c4..7562fe5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
@@ -17,17 +17,24 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -44,10 +51,13 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
private static final int GRID_CNT = 4;
/** First cache name. */
- public static final String FIRST_CACHE_NAME = "first";
+ private static final String FIRST_CACHE_NAME = "first";
/** Second cache name. */
- public static final String SECOND_CACHE_NAME = "second";
+ private static final String SECOND_CACHE_NAME = "second";
+
+ /** Grid name attribute. */
+ private static final String GRID_NAME_ATTR = "org.apache.ignite.ignite.name";
/** First cache mode. */
private CacheMode firstCacheMode;
@@ -55,6 +65,17 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
/** Second cache mode. */
private CacheMode secondCacheMode;
+ /** Caches rebalance finish times. */
+ private ConcurrentHashMap8<Integer, ConcurrentHashMap8<String, Long>> times;
+
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTestsStarted();
+ times = new ConcurrentHashMap8<>();
+
+ for (int i = 0; i < GRID_CNT; i++)
+ times.put(i, new ConcurrentHashMap8<String, Long>());
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -69,6 +90,17 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(discoSpi);
+ Map<IgnitePredicate<? extends Event>, int[]> listeners = new HashMap<>();
+
+ listeners.put(new IgnitePredicate<CacheRebalancingEvent>() {
+ @Override public boolean apply(CacheRebalancingEvent event) {
+ times.get(gridIdx(event)).putIfAbsent(event.cacheName(), event.timestamp());
+ return true;
+ }
+ }, new int[]{EventType.EVT_CACHE_REBALANCE_STOPPED});
+
+ cfg.setLocalEventListeners(listeners);
+
return cfg;
}
@@ -85,7 +117,6 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
cfg.setCacheMode(cacheMode);
cfg.setRebalanceOrder(preloadOrder);
cfg.setRebalanceMode(ASYNC);
-
return cfg;
}
@@ -150,11 +181,20 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
fut1.get();
fut2.get();
- assertTrue("[i=" + i + ", fut1=" + fut1 + ", fut2=" + fut2 + ']', fut1.endTime() <= fut2.endTime());
+ long firstSyncTime = times.get(i).get(FIRST_CACHE_NAME);
+ long secondSyncTime = times.get(i).get(SECOND_CACHE_NAME);
+ assertTrue(
+ FIRST_CACHE_NAME + " [syncTime=" + firstSyncTime + "], "
+ + SECOND_CACHE_NAME + " [syncTime=" + secondSyncTime + "]",
+ firstSyncTime <= secondSyncTime);
}
}
finally {
stopAllGrids();
}
}
+
+ private int gridIdx(Event event) {
+ return getTestIgniteInstanceIndex((String)event.node().attributes().get(GRID_NAME_ATTR));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
index c1cc51e..1e5b9a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
@@ -44,27 +44,13 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
assertFalse(fut.isDone());
- assertTrue(fut.startTime() > 0);
-
U.sleep(100);
- assertTrue(fut.duration() > 0);
-
fut0.onDone("test");
assertEquals("test", fut.get());
assertTrue(fut.isDone());
-
- assertTrue(fut.duration() > 0);
-
- long dur0 = fut.duration();
-
- U.sleep(100);
-
- assertEquals(dur0, fut.duration());
-
- assertEquals("test", fut.get());
}
/**
@@ -77,12 +63,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
assertFalse(fut.isDone());
- assertTrue(fut.startTime() > 0);
-
U.sleep(100);
- assertTrue(fut.duration() > 0);
-
IgniteCheckedException err0 = new IgniteCheckedException("test error");
fut0.onDone(err0);
@@ -99,14 +81,6 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
assertTrue(fut.isDone());
- assertTrue(fut.duration() > 0);
-
- long dur0 = fut.duration();
-
- U.sleep(100);
-
- assertEquals(dur0, fut.duration());
-
err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
fut.get();
@@ -268,12 +242,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
assertFalse(chained.isDone());
- assertTrue(chained.startTime() > 0);
-
U.sleep(100);
- assertTrue(chained.duration() > 0);
-
final AtomicInteger lsnrCnt = new AtomicInteger();
chained.listen(new CI1<IgniteFuture<Integer>>() {
@@ -288,14 +258,6 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
assertTrue(chained.isDone());
- assertTrue(chained.duration() > 0);
-
- long dur0 = chained.duration();
-
- U.sleep(100);
-
- assertEquals(dur0, chained.duration());
-
assertEquals(10, (int)chained.get());
assertEquals(1, lsnrCnt.get());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
index c8263ff..6c338c8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -964,7 +964,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
if (err == null) {
if (log.isDebugEnabled())
log.debug("Initialized child process for external task execution [jobId=" + jobId +
- ", desc=" + desc + ", initTime=" + duration() + ']');
+ ", desc=" + desc + ']');
}
else
U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
index 71bbb84..bc3d40d 100644
--- a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
+++ b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
@@ -404,16 +404,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
}
/** {@inheritDoc} */
- @Override public long startTime() {
- return stats.getCreateTime();
- }
-
- /** {@inheritDoc} */
- @Override public long duration() {
- return stats.getTotalExecutionTime() + stats.getTotalIdleTime();
- }
-
- /** {@inheritDoc} */
@Override public String pattern() {
return pat;
}
@@ -776,16 +766,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
}
/** {@inheritDoc} */
- @Override public long startTime() {
- return ref.startTime();
- }
-
- /** {@inheritDoc} */
- @Override public long duration() {
- return ref.duration();
- }
-
- /** {@inheritDoc} */
@Override public String id() {
return ref.id();
}
[2/2] ignite git commit: ignite-4681 Apply new future adapter
Posted by yz...@apache.org.
ignite-4681 Apply new future adapter
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e922dda6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e922dda6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e922dda6
Branch: refs/heads/master
Commit: e922dda6e9230ff7715f83c7b81e5656e8e856a0
Parents: dd4a5c4
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Thu Apr 13 15:41:54 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Apr 13 15:42:50 2017 +0300
----------------------------------------------------------------------
.../jmh/future/JmhFutureAdapterBenchmark.java | 145 ++++++
.../ignite/internal/IgniteInternalFuture.java | 15 -
.../cache/GridCacheCompoundFuture.java | 63 +++
.../cache/GridCacheCompoundIdentityFuture.java | 63 +++
.../processors/cache/GridCacheFuture.java | 15 +
.../cache/GridCacheFutureAdapter.java | 61 +++
.../distributed/GridCacheTxRecoveryFuture.java | 9 +-
.../dht/CacheDistributedGetFutureAdapter.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 33 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 16 +-
.../dht/GridPartitionedSingleGetFuture.java | 4 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 4 +-
.../GridNearAtomicAbstractUpdateFuture.java | 8 +-
.../GridNearAtomicSingleUpdateFuture.java | 24 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 24 +-
.../colocated/GridDhtColocatedLockFuture.java | 23 +-
.../GridDhtPartitionsExchangeFuture.java | 35 +-
.../distributed/near/GridNearLockFuture.java | 20 +-
...arOptimisticSerializableTxPrepareFuture.java | 2 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 9 +-
.../GridNearPessimisticTxPrepareFuture.java | 5 +-
.../near/GridNearTxFinishFuture.java | 13 +-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../near/GridNearTxPrepareFutureAdapter.java | 4 +-
.../cache/local/GridLocalLockFuture.java | 6 +-
.../query/GridCacheDistributedQueryFuture.java | 18 +-
.../query/GridCacheQueryFutureAdapter.java | 31 +-
.../cache/transactions/TxDeadlockDetection.java | 5 +-
.../platform/compute/PlatformCompute.java | 10 -
.../tcp/GridTcpMemcachedNioListener.java | 20 +-
.../util/future/GridCompoundFuture.java | 45 +-
.../util/future/GridFinishedFuture.java | 13 -
.../internal/util/future/GridFutureAdapter.java | 479 ++++++++++---------
.../internal/util/future/IgniteFutureImpl.java | 10 -
.../org/apache/ignite/lang/IgniteFuture.java | 15 -
.../GridCacheOrderedPreloadingSelfTest.java | 48 +-
.../util/future/IgniteFutureImplTest.java | 38 --
.../external/HadoopExternalTaskExecutor.java | 2 +-
.../processors/schedule/ScheduleFutureImpl.java | 20 -
40 files changed, 810 insertions(+), 555 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
new file mode 100644
index 0000000..ef3643a
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.future;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+/**
+ *
+ */
+public class JmhFutureAdapterBenchmark extends JmhAbstractBenchmark {
+ /** */
+ private static final IgniteInClosure<IgniteInternalFuture<Long>> LSNR = new IgniteInClosure<IgniteInternalFuture<Long>>() {
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteInternalFuture<Long> fut) {
+ // No-op
+ }
+ };
+
+ /** */
+ private static final Long RES = 0L;
+
+ /**
+ *
+ */
+ @State(Scope.Thread)
+ public static class CompleteState {
+ /** */
+ private final BlockingQueue<GridFutureAdapter<Long>> queue = new ArrayBlockingQueue<>(10);
+
+ /** */
+ private final Thread compleete = new Thread() {
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ while (!Thread.interrupted()) {
+ GridFutureAdapter<Long> fut = queue.poll();
+ if (fut != null)
+ fut.onDone(RES);
+ }
+ }
+ };
+
+ /**
+ *
+ */
+ @Setup public void setup() {
+ compleete.start();
+ }
+
+ /**
+ * @throws InterruptedException If failed.
+ */
+ @TearDown public void destroy() throws InterruptedException {
+ compleete.interrupt();
+ compleete.join();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ public void testSimpleGet() throws Exception {
+ GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+ fut.onDone(RES);
+ fut.get();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ public void testSimpleGetWithListener() throws Exception {
+ GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+ fut.listen(LSNR);
+ fut.onDone(RES);
+ fut.get();
+ }
+
+ /**
+ * @param state Benchmark context.
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ @Threads(4)
+ public void completeFutureGet(CompleteState state) throws Exception {
+ GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+ state.queue.put(fut);
+ fut.get();
+ }
+
+ /**
+ * Run benchmarks.
+ *
+ * @param args Arguments.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ run(8);
+ }
+
+ /**
+ * Run benchmark.
+ *
+ * @param threads Amount of threads.
+ * @throws Exception If failed.
+ */
+ private static void run(int threads) throws Exception {
+ JmhIdeBenchmarkRunner.create()
+ .forks(1)
+ .threads(threads)
+ .warmupIterations(30)
+ .measurementIterations(30)
+ .benchmarks(JmhFutureAdapterBenchmark.class.getSimpleName())
+ .jvmArguments("-Xms4g", "-Xmx4g")
+ .run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index 789556d..76f8c71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -103,21 +103,6 @@ public interface IgniteInternalFuture<R> {
public boolean isCancelled();
/**
- * Gets start time for this future.
- *
- * @return Start time for this future.
- */
- public long startTime();
-
- /**
- * Gets duration in milliseconds between start of the future and current time if future
- * is not finished, or between start and finish of this future.
- *
- * @return Time in milliseconds this future has taken to execute.
- */
- public long duration();
-
- /**
* Registers listener closure to be asynchronously notified whenever future completes.
*
* @param lsnr Listener closure to register. If not provided - this method is no-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
new file mode 100644
index 0000000..9869d4a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheCompoundFuture<T, R> extends GridCompoundFuture<T, R> implements GridCacheFuture<R> {
+ /** Future start time. */
+ private final long startTime = U.currentTimeMillis();
+
+ /** Future end time. */
+ private volatile long endTime;
+
+ /**
+ * @param rdc Reducer.
+ */
+ protected GridCacheCompoundFuture(@Nullable IgniteReducer<T, R> rdc) {
+ super(rdc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long duration() {
+ long endTime = this.endTime;
+
+ return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+ if(super.onDone(res, err, cancel)){
+ endTime = U.currentTimeMillis();
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
new file mode 100644
index 0000000..8fd619a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheCompoundIdentityFuture<T> extends GridCompoundIdentityFuture<T> implements GridCacheFuture<T> {
+ /** Future start time. */
+ private final long startTime = U.currentTimeMillis();
+
+ /** Future end time. */
+ private volatile long endTime;
+
+ /**
+ * @param rdc Reducer.
+ */
+ protected GridCacheCompoundIdentityFuture(@Nullable IgniteReducer<T, T> rdc) {
+ super(rdc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long duration() {
+ long endTime = this.endTime;
+
+ return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancel) {
+ if(super.onDone(res, err, cancel)){
+ endTime = U.currentTimeMillis();
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
index 8bf8d40..90a219a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
@@ -26,6 +26,21 @@ import org.apache.ignite.lang.IgniteUuid;
*/
public interface GridCacheFuture<R> extends IgniteInternalFuture<R> {
/**
+ * Gets start time for this future.
+ *
+ * @return Start time for this future.
+ */
+ public long startTime();
+
+ /**
+ * Gets duration in milliseconds between start of the future and current time if future
+ * is not finished, or between start and finish of this future.
+ *
+ * @return Time in milliseconds this future has taken to execute.
+ */
+ public long duration();
+
+ /**
* @return Unique identifier for this future.
*/
public IgniteUuid futureId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
new file mode 100644
index 0000000..babd707
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheFutureAdapter<R> extends GridFutureAdapter<R> implements GridCacheFuture<R> {
+ /** Future start time. */
+ private final long startTime = U.currentTimeMillis();
+
+ /** Future end time. */
+ private volatile long endTime;
+
+ /**
+ * Default constructor.
+ */
+ public GridCacheFutureAdapter() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long duration() {
+ long endTime = this.endTime;
+
+ return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+ if(super.onDone(res, err, cancel)){
+ endTime = U.currentTimeMillis();
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index e27f777..1c97de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -28,11 +28,11 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -49,7 +49,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
/**
* Future verifying that all remote transactions related to transaction were prepared or committed.
*/
-public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -426,7 +426,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -547,9 +547,6 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
*
*/
private class MiniFuture extends GridFutureAdapter<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Mini future ID. */
private final IgniteUuid futId = IgniteUuid.randomUuid();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 4381dfd..259b096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -25,11 +25,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteUuid;
@@ -42,7 +42,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
*
*/
-public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
+public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCompoundIdentityFuture<Map<K, V>>
implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
/** Default max remap count value. */
public static final int DFLT_MAX_REMAP_CNT = 3;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a0270b0..1a7c2c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -77,7 +77,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
/**
* Cache lock future.
*/
-public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
/** */
private static final long serialVersionUID = 0L;
@@ -298,10 +298,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return Entries.
*/
- public Collection<GridDhtCacheEntry> entriesCopy() {
- synchronized (sync) {
- return new ArrayList<>(entries());
- }
+ public synchronized Collection<GridDhtCacheEntry> entriesCopy() {
+ return new ArrayList<>(entries());
}
/**
@@ -395,7 +393,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
return null;
}
- synchronized (sync) {
+ synchronized (this) {
entries.add(c == null || c.reentry() ? null : entry);
if (c != null && !c.reentry())
@@ -529,7 +527,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -599,7 +597,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param t Error.
*/
public void onError(Throwable t) {
- synchronized (sync) {
+ synchronized (this) {
if (err != null)
return;
@@ -646,7 +644,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]");
if (owner != null && owner.version().equals(lockVer)) {
- synchronized (sync) {
+ synchronized (this) {
if (!pendingLocks.remove(entry.key()))
return false;
}
@@ -663,10 +661,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return {@code True} if locks have been acquired.
*/
- private boolean checkLocks() {
- synchronized (sync) {
- return pendingLocks.isEmpty();
- }
+ private synchronized boolean checkLocks() {
+ return pendingLocks.isEmpty();
}
/** {@inheritDoc} */
@@ -697,7 +693,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (isDone() || (err == null && success && !checkLocks()))
return false;
- synchronized (sync) {
+ synchronized (this) {
if (this.err == null)
this.err = err;
}
@@ -776,7 +772,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param entries Entries.
*/
private void map(Iterable<GridDhtCacheEntry> entries) {
- synchronized (sync) {
+ synchronized (this) {
if (mapped)
return;
@@ -1120,7 +1116,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
- synchronized (sync) {
+ synchronized (GridDhtLockFuture.this) {
timedOut = true;
// Stop locks and responses processing.
@@ -1146,9 +1142,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
*/
private class MiniFuture extends GridFutureAdapter<Boolean> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final IgniteUuid futId = IgniteUuid.randomUuid();
/** Node. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 17e9047..23d7657 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -28,12 +28,12 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -50,7 +50,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING;
/**
*
*/
-public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 93ea30d..964d423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -97,7 +98,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
*
*/
@SuppressWarnings("unchecked")
-public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
+public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
@@ -279,7 +280,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean rmv;
- synchronized (sync) {
+ synchronized (this) {
rmv = lockKeys.remove(entry.txKey());
}
@@ -310,7 +311,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (!locksReady)
return false;
- synchronized (sync) {
+ synchronized (this) {
return lockKeys.isEmpty();
}
}
@@ -564,7 +565,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -623,7 +624,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
if (tx.optimistic() && txEntry.explicitVersion() == null) {
- synchronized (sync) {
+ synchronized (this) {
lockKeys.add(txEntry.txKey());
}
}
@@ -1597,9 +1598,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final int futId;
/** Node ID. */
@@ -1811,7 +1809,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** {@inheritDoc} */
@Override public void onTimeout() {
- synchronized (sync) {
+ synchronized (GridDhtTxPrepareFuture.this) {
clear();
lockKeys.clear();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 47f4066..3af691c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
@@ -61,7 +61,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
*
*/
-public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>,
+public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>,
CacheGetFuture {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 0940acb..039cb99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -36,12 +36,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -58,7 +58,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
/**
* DHT atomic cache backup update future.
*/
-public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapter<Void>
+public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureAdapter<Void>
implements GridCacheAtomicFuture<Void> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index a2adb05..122e17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -58,7 +59,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
/**
* Base for near atomic update futures.
*/
-public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapter<Object>
+public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFutureAdapter<Object>
implements GridCacheAtomicFuture<Object> {
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -114,9 +115,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Near cache flag. */
protected final boolean nearEnabled;
- /** Mutex to synchronize state updates. */
- protected final Object mux = new Object();
-
/** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
protected boolean topLocked;
@@ -138,7 +136,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Future ID. */
@GridToStringInclude
- protected long futId;
+ protected volatile long futId;
/** Operation result. */
protected GridCacheReturn opRes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index e4ba457..11c3336 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -126,9 +126,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override public long id() {
- synchronized (mux) {
- return futId;
- }
+ return futId;
}
/** {@inheritDoc} */
@@ -141,7 +139,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
boolean rcvAll = false;
- synchronized (mux) {
+ synchronized (this) {
if (reqState == null)
return false;
@@ -215,7 +213,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
CachePartialUpdateCheckedException err0;
AffinityTopologyVersion remapTopVer0;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -257,7 +255,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -331,7 +329,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @return Non-null topology version if update should be remapped.
*/
private AffinityTopologyVersion onAllReceived() {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
assert futId > 0;
AffinityTopologyVersion remapTopVer0 = null;
@@ -488,7 +486,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
try {
reqState0 = mapSingleUpdate(topVer, futId);
- synchronized (mux) {
+ synchronized (this) {
assert this.futId == 0 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
@@ -537,7 +535,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicCheckUpdateRequest checkReq = null;
- synchronized (mux) {
+ synchronized (this) {
if (this.futId == 0 || this.futId != futId)
return;
@@ -568,7 +566,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
private Long onFutureDone() {
Long id0;
- synchronized (mux) {
+ synchronized (this) {
id0 = futId;
futId = 0;
@@ -734,9 +732,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- public String toString() {
- synchronized (mux) {
- return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
- }
+ public synchronized String toString() {
+ return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 84deefc..6198de4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -151,9 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/** {@inheritDoc} */
@Override public long id() {
- synchronized (mux) {
- return futId;
- }
+ return futId;
}
/** {@inheritDoc} */
@@ -166,7 +164,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0)
return false;
@@ -299,7 +297,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
CachePartialUpdateCheckedException err0;
AffinityTopologyVersion remapTopVer0;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -372,7 +370,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -534,7 +532,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @return Non null topology version if update should be remapped.
*/
@Nullable private AffinityTopologyVersion onAllReceived() {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
assert futId > 0;
AffinityTopologyVersion remapTopVer0 = null;
@@ -801,7 +799,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
}
- synchronized (mux) {
+ synchronized (this) {
assert this.futId == 0 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
@@ -866,7 +864,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll = false;
- synchronized (mux) {
+ synchronized (this) {
if (this.futId == 0 || this.futId != futId)
return;
@@ -938,7 +936,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
private Long onFutureDone() {
Long id0;
- synchronized (mux) {
+ synchronized (this) {
id0 = futId;
futId = 0;
@@ -1181,9 +1179,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- public String toString() {
- synchronized (mux) {
- return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
- }
+ public synchronized String toString() {
+ return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 79c15fb..8512298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -82,7 +82,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
/**
* Colocated cache lock future.
*/
-public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -203,7 +203,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
this.skipStore = skipStore;
this.keepBinary = keepBinary;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -452,13 +452,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/**
* @return Keys for which locks requested from remote nodes but response isn't received.
*/
- public Set<IgniteTxKey> requestedKeys() {
- synchronized (sync) {
- if (timeoutObj != null && timeoutObj.requestedKeys != null)
- return timeoutObj.requestedKeys;
+ public synchronized Set<IgniteTxKey> requestedKeys() {
+ if (timeoutObj != null && timeoutObj.requestedKeys != null)
+ return timeoutObj.requestedKeys;
- return requestedKeys0();
- }
+ return requestedKeys0();
}
/**
@@ -490,7 +488,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -1341,7 +1339,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
log.debug("Timed out waiting for lock response: " + this);
if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
- synchronized (sync) {
+ synchronized (GridDhtColocatedLockFuture.this) {
requestedKeys = requestedKeys0();
clear(); // Stop response processing.
@@ -1390,9 +1388,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
*/
private class MiniFuture extends GridFutureAdapter<Boolean> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final int futId;
/** Node ID. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f41da2b..55aca2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -171,9 +171,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
@GridToStringInclude
private volatile IgniteInternalFuture<?> partReleaseFut;
- /** */
- private final Object mux = new Object();
-
/** Logger. */
private IgniteLogger log;
@@ -1087,7 +1084,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (super.onDone(res, err) && realExchange) {
if (log.isDebugEnabled())
log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
- "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+ ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
initFut.onDone(err == null);
@@ -1201,7 +1198,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
boolean allReceived = false;
boolean updateSingleMap = false;
- synchronized (mux) {
+ synchronized (this) {
assert crd != null;
if (crd.isLocal()) {
@@ -1222,13 +1219,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
updatePartitionSingleMap(msg);
}
finally {
- synchronized (mux) {
+ synchronized (this) {
assert pendingSingleUpdates > 0;
pendingSingleUpdates--;
if (pendingSingleUpdates == 0)
- mux.notifyAll();
+ notifyAll();
}
}
}
@@ -1243,15 +1240,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
*
*/
- private void awaitSingleMapUpdates() {
- synchronized (mux) {
- try {
- while (pendingSingleUpdates > 0)
- U.wait(mux);
- }
- catch (IgniteInterruptedCheckedException e) {
- U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
- }
+ private synchronized void awaitSingleMapUpdates() {
+ try {
+ while (pendingSingleUpdates > 0)
+ U.wait(this);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
}
}
@@ -1316,7 +1311,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
else {
List<ClusterNode> nodes;
- synchronized (mux) {
+ synchronized (this) {
srvNodes.remove(cctx.localNode());
nodes = new ArrayList<>(srvNodes);
@@ -1423,7 +1418,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert msg.exchangeId().equals(exchId) : msg;
assert msg.lastVersion() != null : msg;
- synchronized (mux) {
+ synchronized (this) {
if (crd == null)
return;
@@ -1605,7 +1600,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
discoCache.updateAlives(node);
- synchronized (mux) {
+ synchronized (this) {
if (!srvNodes.remove(node))
return;
@@ -1755,7 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
Set<UUID> remaining;
List<ClusterNode> srvNodes;
- synchronized (mux) {
+ synchronized (this) {
remaining = new HashSet<>(this.remaining);
srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) : null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 1948df0..8de01c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -80,7 +80,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
/**
* Cache lock future.
*/
-public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -209,7 +209,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
this.skipStore = skipStore;
this.keepBinary = keepBinary;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -499,13 +499,11 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/**
* @return Keys for which locks requested from remote nodes but response isn't received.
*/
- public Set<IgniteTxKey> requestedKeys() {
- synchronized (sync) {
- if (timeoutObj != null && timeoutObj.requestedKeys != null)
- return timeoutObj.requestedKeys;
+ public synchronized Set<IgniteTxKey> requestedKeys() {
+ if (timeoutObj != null && timeoutObj.requestedKeys != null)
+ return timeoutObj.requestedKeys;
- return requestedKeys0();
- }
+ return requestedKeys0();
}
/**
@@ -537,7 +535,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -1440,7 +1438,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
timedOut = true;
if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
- synchronized (sync) {
+ synchronized (GridNearLockFuture.this) {
requestedKeys = requestedKeys0();
clear(); // Stop response processing.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 80508dc..cbd9d23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -227,7 +227,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 6189b38..81179ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -206,7 +206,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
public Set<IgniteTxKey> requestedKeys() {
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -239,7 +239,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -694,7 +694,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (keyLockFut != null)
keys = new HashSet<>(keyLockFut.lockKeys);
else {
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -765,9 +765,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*
*/
private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Receive result flag updater. */
private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 4a443a9..cb15bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -132,7 +132,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -365,9 +365,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
*/
private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final int futId;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 1b0566b..37be0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -46,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -66,7 +66,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
*
*/
-public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** */
private static final long serialVersionUID = 0L;
@@ -114,7 +114,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
this.tx = tx;
this.commit = commit;
- ignoreInterrupts(true);
+ ignoreInterrupts();
mappings = tx.mappings();
@@ -189,7 +189,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (!isDone()) {
FinishMiniFuture finishFut = null;
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -878,9 +878,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*
*/
private class FinishMiniFuture extends MinFuture {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Keys. */
@GridToStringInclude
private GridDistributedTxMapping m;
@@ -926,7 +923,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (!F.isEmpty(backups)) {
final CheckRemoteTxMiniFuture mini;
- synchronized (sync) {
+ synchronized (GridNearTxFinishFuture.this) {
int futId = Integer.MIN_VALUE + futuresCountNoLock();
mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8be87d4..5baec99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3946,7 +3946,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
private <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T> fut) {
// Safety.
if (fut instanceof GridFutureAdapter)
- ((GridFutureAdapter)fut).ignoreInterrupts(true);
+ ((GridFutureAdapter)fut).ignoreInterrupts();
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index f9a6353..7f1f5a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
@@ -35,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -49,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
* Common code for tx prepare in optimistic and pessimistic modes.
*/
public abstract class GridNearTxPrepareFutureAdapter extends
- GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> {
+ GridCacheCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> {
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 8e224c8..d8e95b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -41,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.transactions.TransactionDeadlockException;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -53,7 +53,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Cache lock future.
*/
-public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
+public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -135,7 +135,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
this.filter = filter;
this.tx = tx;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 6110e0c..4c8e34f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -71,7 +71,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
assert mgr != null;
- synchronized (mux) {
+ synchronized (this) {
for (ClusterNode node : nodes)
subgrid.add(node.id());
}
@@ -87,7 +87,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
Collection<ClusterNode> nodes;
- synchronized (mux) {
+ synchronized (this) {
nodes = F.retain(allNodes, true,
new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
@@ -139,7 +139,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
@Override protected void onNodeLeft(UUID nodeId) {
boolean callOnPage;
- synchronized (mux) {
+ synchronized (this) {
callOnPage = !loc && subgrid.contains(nodeId);
}
@@ -166,7 +166,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
/** {@inheritDoc} */
@Override protected boolean onPage(UUID nodeId, boolean last) {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
if (!loc) {
rcvd.add(nodeId);
@@ -192,11 +192,11 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
/** {@inheritDoc} */
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
@Override protected void loadPage() {
- assert !Thread.holdsLock(mux);
+ assert !Thread.holdsLock(this);
Collection<ClusterNode> nodes = null;
- synchronized (mux) {
+ synchronized (this) {
if (!isDone() && rcvd.containsAll(subgrid)) {
rcvd.clear();
@@ -211,13 +211,13 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
/** {@inheritDoc} */
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
@Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
- assert !Thread.holdsLock(mux);
+ assert !Thread.holdsLock(this);
U.await(firstPageLatch);
Collection<ClusterNode> nodes = null;
- synchronized (mux) {
+ synchronized (this) {
if (!isDone() && !subgrid.isEmpty())
nodes = nodes();
}
@@ -230,7 +230,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
* @return Nodes to send requests to.
*/
private Collection<ClusterNode> nodes() {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());