You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/04/13 12:42:56 UTC

[1/2] ignite git commit: ignite-4681 Apply new future adapter

Repository: ignite
Updated Branches:
  refs/heads/master dd4a5c423 -> e922dda6e


http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 4b8915c..7ad7c31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -85,9 +85,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     private Iterator<R> iter;
 
     /** */
-    protected final Object mux = new Object();
-
-    /** */
     private IgniteUuid timeoutId = IgniteUuid.randomUuid();
 
     /** */
@@ -215,7 +212,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
         Collection<R> res = null;
 
         while (res == null) {
-            synchronized (mux) {
+            synchronized (this) {
                 res = queue.poll();
             }
 
@@ -228,10 +225,10 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
                     if (waitTime <= 0)
                         break;
 
-                    synchronized (mux) {
+                    synchronized (this) {
                         try {
                             if (queue.isEmpty() && !isDone())
-                                mux.wait(waitTime);
+                                wait(waitTime);
                         }
                         catch (InterruptedException e) {
                             Thread.currentThread().interrupt();
@@ -273,7 +270,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
         while (it == null || !it.hasNext()) {
             Collection<R> c;
 
-            synchronized (mux) {
+            synchronized (this) {
                 it = iter;
 
                 if (it != null && it.hasNext())
@@ -301,10 +298,10 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
                     break;
                 }
 
-                synchronized (mux) {
+                synchronized (this) {
                     try {
                         if (queue.isEmpty() && !isDone())
-                            mux.wait(waitTime);
+                            wait(waitTime);
                     }
                     catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
@@ -332,7 +329,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
      */
     @SuppressWarnings({"unchecked"})
     protected void enqueue(Collection<?> col) {
-        assert Thread.holdsLock(mux);
+        assert Thread.holdsLock(this);
 
         queue.add((Collection<R>)col);
 
@@ -351,7 +348,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
 
         Collection<Object> dedupCol = new ArrayList<>(col.size());
 
-        synchronized (mux) {
+        synchronized (this) {
             for (Object o : col)
                 if (!(o instanceof Map.Entry) || keys.add(((Map.Entry<K, V>)o).getKey()))
                     dedupCol.add(o);
@@ -377,7 +374,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
 
         try {
             if (err != null)
-                synchronized (mux) {
+                synchronized (this) {
                     enqueue(Collections.emptyList());
 
                     onDone(nodeId != null ?
@@ -387,7 +384,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
 
                     onPage(nodeId, true);
 
-                    mux.notifyAll();
+                    notifyAll();
                 }
             else {
                 if (data == null)
@@ -397,7 +394,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
 
                 data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary());
 
-                synchronized (mux) {
+                synchronized (this) {
                     enqueue(data);
 
                     if (qry.query().keepAll())
@@ -409,7 +406,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
                         clear();
                     }
 
-                    mux.notifyAll();
+                    notifyAll();
                 }
             }
         }
@@ -426,14 +423,14 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
      * @param e Error.
      */
     private void onPageError(@Nullable UUID nodeId, Throwable e) {
-        synchronized (mux) {
+        synchronized (this) {
             enqueue(Collections.emptyList());
 
             onPage(nodeId, true);
 
             onDone(e);
 
-            mux.notifyAll();
+            notifyAll();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
index 67d00ea..1cf7809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
@@ -214,9 +214,6 @@ public class TxDeadlockDetection {
         /** Timed out flag. */
         private volatile boolean timedOut;
 
-        /** Mutex. */
-        private final Object mux = new Object();
-
         /**
          * @param cctx Context.
          * @param txId Tx ID.
@@ -521,7 +518,7 @@ public class TxDeadlockDetection {
          * @param val Value.
          */
         private boolean compareAndSet(UUID exp, UUID val) {
-            synchronized (mux) {
+            synchronized (this) {
                 if (Objects.equals(curNodeId, exp)) {
                     curNodeId = val;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 2b2a78a..6012625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -383,16 +383,6 @@ public class PlatformCompute extends PlatformAbstractTarget {
         }
 
         /** {@inheritDoc} */
-        @Override public long startTime() {
-            return fut.startTime();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long duration() {
-            return fut.duration();
-        }
-
-        /** {@inheritDoc} */
         @Override public void listen(final IgniteInClosure lsnr) {
             fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
                 private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
index 71eca65..c1334e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
@@ -31,11 +31,11 @@ import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
 import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.lang.IgniteClosure2X;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.C2;
-import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
@@ -178,11 +178,11 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
             return null;
         }
 
-        IgniteInternalFuture<GridRestResponse> f = hnd.handleAsync(createRestRequest(req, cmd.get1()));
-
-        f.listen(new CIX1<IgniteInternalFuture<GridRestResponse>>() {
-            @Override public void applyx(IgniteInternalFuture<GridRestResponse> f) throws IgniteCheckedException {
-                GridRestResponse restRes = f.get();
+        return new GridEmbeddedFuture<>(new IgniteClosure2X<GridRestResponse, Exception, GridRestResponse>() {
+            @Override public GridRestResponse applyx(GridRestResponse restRes,
+                Exception ex) throws IgniteCheckedException {
+                if(ex != null)
+                    throw U.cast(ex);
 
                 // Handle 'Stat' command (special case because several packets are included in response).
                 if (cmd.get1() == CACHE_METRICS) {
@@ -237,7 +237,7 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
                     else
                         res.status(FAILURE);
 
-                    if (cmd.get3())
+                    if (cmd.get3() == Boolean.TRUE)
                         res.key(req.key());
 
                     if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS && res.addData() &&
@@ -246,10 +246,10 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
 
                     sendResponse(ses, res);
                 }
-            }
-        });
 
-        return f;
+                return restRes;
+            }
+        }, hnd.handleAsync(createRestRequest(req, cmd.get1())));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 96f3797..3e08cd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -57,9 +57,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     private static final AtomicIntegerFieldUpdater<GridCompoundFuture> LSNR_CALLS_UPD =
         AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
 
-    /** Sync object */
-    protected final Object sync = new Object();
-
     /** Possible values: null (no future), IgniteInternalFuture instance (single future) or List of futures  */
     private volatile Object futs;
 
@@ -146,7 +143,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
             throw e;
         }
 
-        LSNR_CALLS_UPD.incrementAndGet(GridCompoundFuture.this);
+        LSNR_CALLS_UPD.incrementAndGet(this);
 
         checkComplete();
     }
@@ -169,16 +166,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      * @return Collection of futures.
      */
     @SuppressWarnings("unchecked")
-    public final Collection<IgniteInternalFuture<T>> futures() {
-        synchronized (sync) {
-            if (futs == null)
-                return Collections.emptyList();
+    public synchronized final Collection<IgniteInternalFuture<T>> futures() {
+        if (futs == null)
+            return Collections.emptyList();
 
-            if (futs instanceof IgniteInternalFuture)
-                return Collections.singletonList((IgniteInternalFuture<T>)futs);
+        if (futs instanceof IgniteInternalFuture)
+            return Collections.singletonList((IgniteInternalFuture<T>)futs);
 
-            return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
-        }
+        return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
     }
 
     /**
@@ -200,7 +195,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     protected final boolean hasPending() {
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation and collection copy.
@@ -224,7 +219,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     public final void add(IgniteInternalFuture<T> fut) {
         assert fut != null;
 
-        synchronized (sync) {
+        synchronized (this) {
             if (futs == null)
                 futs = fut;
             else if (futs instanceof IgniteInternalFuture) {
@@ -254,10 +249,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     /**
      * Clear futures.
      */
-    protected final void clear() {
-        synchronized (sync) {
-            futs = null;
-        }
+    protected synchronized final void clear() {
+        futs = null;
     }
 
     /**
@@ -307,7 +300,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      */
     @SuppressWarnings("unchecked")
     protected final IgniteInternalFuture<T> future(int idx) {
-        assert Thread.holdsLock(sync);
+        assert Thread.holdsLock(this);
         assert futs != null && idx >= 0 && idx < futuresCountNoLock();
 
         if (futs instanceof IgniteInternalFuture) {
@@ -324,7 +317,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      */
     @SuppressWarnings("unchecked")
     protected final int futuresCountNoLock() {
-        assert Thread.holdsLock(sync);
+        assert Thread.holdsLock(this);
 
         if (futs == null)
             return 0;
@@ -338,19 +331,15 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     /**
      * @return Futures size.
      */
-    private int futuresCount() {
-        synchronized (sync) {
-            return futuresCountNoLock();
-        }
+    private synchronized int futuresCount() {
+        return futuresCountNoLock();
     }
 
     /**
      * @return {@code True} if has at least one future.
      */
-    protected final boolean hasFutures() {
-        synchronized (sync) {
-            return futs != null;
-        }
+    protected synchronized final boolean hasFutures() {
+        return futs != null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index dc63adc..b149035 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -43,9 +43,6 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
     /** Complete value. */
     private final Object res;
 
-    /** Start time. */
-    private final long startTime = U.currentTimeMillis();
-
     /**
      * Creates finished future with complete value.
      */
@@ -84,16 +81,6 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
     }
 
     /** {@inheritDoc} */
-    @Override public long startTime() {
-        return startTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long duration() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean cancel() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 723dff7..323babd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.util.future;
 
-import java.util.Arrays;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,79 +38,100 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Future adapter.
  */
-public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> {
-    /** */
-    private static final long serialVersionUID = 0L;
+public class GridFutureAdapter<R> implements IgniteInternalFuture<R> {
+    /** Done state representation. */
+    private static final String DONE = "DONE";
 
     /** Initial state. */
-    private static final int INIT = 0;
+    private static final Node INIT = new Node(null);
 
     /** Cancelled state. */
-    private static final int CANCELLED = 1;
-
-    /** Done state. */
-    private static final int DONE = 2;
-
-    /** */
-    private static final byte ERR = 1;
+    private static final Object CANCELLED = new Object();
 
     /** */
-    private static final byte RES = 2;
+    private static final AtomicReferenceFieldUpdater<GridFutureAdapter, Object> stateUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(GridFutureAdapter.class, Object.class, "state");
 
-    /** */
-    private byte resFlag;
+    /*
+     * https://bugs.openjdk.java.net/browse/JDK-8074773
+     */
+    static {
+        @SuppressWarnings("unused")
+        Class<?> ensureLoaded = LockSupport.class;
+    }
 
-    /** Result. */
-    @GridToStringInclude(sensitive = true)
-    private Object res;
+    /**
+     * Stack node.
+     */
+    private static final class Node {
+        /** */
+        private final Object val;
 
-    /** Future start time. */
-    private final long startTime = U.currentTimeMillis();
+        /** */
+        private volatile Node next;
 
-    /** Future end time. */
-    private volatile long endTime;
+        /**
+         * @param val Node value.
+         */
+        Node(Object val) {
+            this.val = val;
+        }
+    }
 
     /** */
-    private boolean ignoreInterrupts;
+    private static final class ErrorWrapper {
+        /** */
+        private final Throwable error;
 
-    /** */
-    @GridToStringExclude
-    private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
+        /**
+         * @param error Error.
+         */
+        ErrorWrapper(Throwable error) {
+            this.error = error;
+        }
 
-    /** {@inheritDoc} */
-    @Override public long startTime() {
-        return startTime;
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.valueOf(error);
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public long duration() {
-        long endTime = this.endTime;
-
-        return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
-    }
+    /** */
+    private boolean ignoreInterrupts;
 
-    /**
-     * @param ignoreInterrupts Ignore interrupts flag.
-     */
-    public void ignoreInterrupts(boolean ignoreInterrupts) {
-        this.ignoreInterrupts = ignoreInterrupts;
-    }
+    /** */
+    @GridToStringExclude
+    private volatile Object state = INIT;
 
     /**
-     * @return Future end time.
+     * Determines whether the future will ignore interrupts.
      */
-    public long endTime() {
-        return endTime;
+    public void ignoreInterrupts() {
+        ignoreInterrupts = true;
     }
 
     /** {@inheritDoc} */
     @Override public Throwable error() {
-        return (resFlag == ERR) ? (Throwable)res : null;
+        Object state0 = state;
+
+        if (state0 != null && state0.getClass() == ErrorWrapper.class)
+            return ((ErrorWrapper)state0).error;
+
+        return null;
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public R result() {
-        return resFlag == RES ? (R)res : null;
+        Object state0 = state;
+
+        if(state0 == null ||                           // It is DONE state
+           (state0.getClass() != Node.class &&         // It is not INIT state
+            state0.getClass() != ErrorWrapper.class && // It is not FAILED
+            state0 != CANCELLED))                      // It is not CANCELLED
+            return (R)state0;
+
+        return null;
     }
 
     /** {@inheritDoc} */
@@ -135,14 +155,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
         A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
         A.notNull(unit, "unit");
 
-        try {
-            return get0(unit.toNanos(timeout));
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
-        }
+        return get0(unit.toNanos(timeout));
     }
 
     /**
@@ -153,79 +166,190 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
      * @throws IgniteCheckedException If failed.
      */
     private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
+        if (isDone() || !registerWaiter(Thread.currentThread()))
+            return resolve();
+
+        boolean interrupted = false;
+
         try {
-            if (endTime == 0) {
-                if (ignoreInterrupts)
-                    acquireShared(0);
-                else
-                    acquireSharedInterruptibly(0);
-            }
+            while (true) {
+                LockSupport.park();
 
-            if (getState() == CANCELLED)
-                throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
+                if (Thread.interrupted()) {
+                    interrupted = true;
 
-            assert resFlag != 0;
+                    if (!ignoreInterrupts) {
+                        unregisterWaiter(Thread.currentThread());
 
-            if (resFlag == ERR)
-                throw U.cast((Throwable)res);
+                        throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.");
+                    }
+                }
 
-            return (R)res;
+                if (isDone())
+                    return resolve();
+            }
         }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
+        finally {
+            if (interrupted)
+                Thread.currentThread().interrupt();
         }
     }
 
     /**
      * @param nanosTimeout Timeout (nanoseconds).
      * @return Result.
-     * @throws InterruptedException If interrupted.
      * @throws IgniteFutureTimeoutCheckedException If timeout reached before computation completed.
      * @throws IgniteCheckedException If error occurred.
      */
-    @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
-        if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
-            throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+    @Nullable private R get0(long nanosTimeout) throws IgniteCheckedException {
+        if (isDone() || !registerWaiter(Thread.currentThread()))
+            return resolve();
+
+        long deadlineNanos = System.nanoTime() + nanosTimeout;
+
+        boolean interrupted = false;
+
+        try {
+            long nanosTimeout0 = nanosTimeout;
+
+            while (nanosTimeout0 > 0) {
+                LockSupport.parkNanos(nanosTimeout0);
+
+                nanosTimeout0 = deadlineNanos - System.nanoTime();
+
+                if (Thread.interrupted()) {
+                    interrupted = true;
+
+                    if (!ignoreInterrupts) {
+                        unregisterWaiter(Thread.currentThread());
+
+                        throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.");
+                    }
+                }
+
+                if (isDone())
+                    return resolve();
+            }
+        }
+        finally {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+        }
+
+        unregisterWaiter(Thread.currentThread());
 
-        if (getState() == CANCELLED)
+        throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
+    }
+
+    /**
+     * Resolves the value to result or exception.
+     *
+     * @return Result.
+     * @throws IgniteCheckedException If resolved to exception.
+     */
+    @SuppressWarnings("unchecked")
+    private R resolve() throws IgniteCheckedException {
+        if(state == CANCELLED)
             throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
 
-        assert resFlag != 0;
+        if(state == null || state.getClass() != ErrorWrapper.class)
+            return (R)state;
+
+        throw U.cast(((ErrorWrapper)state).error);
+    }
+
+    /**
+     * @param waiter Waiter to register.
+     * @return {@code True} if was registered successfully.
+     */
+    private boolean registerWaiter(Object waiter) {
+        Node node = null;
+
+        while (true) {
+            final Object oldState = state;
+
+            if (isDone(oldState))
+                return false;
+
+            if(node == null)
+                node = new Node(waiter);
 
-        if (resFlag == ERR)
-            throw U.cast((Throwable)res);
+            if(oldState != INIT && oldState.getClass() == Node.class)
+                node.next = (Node)oldState;
 
-        return (R)res;
+            if (compareAndSetState(oldState, node))
+                return true;
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) {
-        assert lsnr0 != null;
+    /**
+     * @param waiter Waiter to unregister.
+     */
+    private void unregisterWaiter(Thread waiter) {
+        Node prev = null;
+        Object cur = state;
 
-        boolean done = isDone();
+        while (cur != null) {
+            if(cur.getClass() != Node.class)
+                return;
+
+            Object curWaiter = ((Node)cur).val;
+            Node next = ((Node)cur).next;
 
-        if (!done) {
-            synchronized (this) {
-                done = isDone(); // Double check.
+            if (curWaiter == waiter) {
+                if (prev == null) {
+                    Object n = next == null ? INIT : next;
 
-                if (!done) {
-                    if (lsnr == null)
-                        lsnr = lsnr0;
-                    else if (lsnr instanceof ArrayListener)
-                        ((ArrayListener)lsnr).add(lsnr0);
-                    else
-                        lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0);
+                    cur = compareAndSetState(cur, n) ? null : state;
+                }
+                else {
+                    prev.next = next;
 
-                    return;
+                    cur = null;
                 }
             }
+            else {
+                prev = (Node)cur;
+
+                cur = next;
+            }
+        }
+    }
+
+    /**
+     * @param exp Expected state.
+     * @param newState New state.
+     * @return {@code True} if success
+     */
+    private boolean compareAndSetState(Object exp, Object newState) {
+        return stateUpdater.compareAndSet(this, exp, newState);
+    }
+
+    /**
+     * @param head Head of waiters stack.
+     */
+    @SuppressWarnings("unchecked")
+    private void unblockAll(Node head) {
+        while (head != null) {
+            unblock(head.val);
+            head = head.next;
         }
+    }
 
-        assert done;
+    /**
+     * @param waiter Waiter to unblock
+     */
+    private void unblock(Object waiter) {
+        if(waiter instanceof Thread)
+            LockSupport.unpark((Thread)waiter);
+        else
+            notifyListener((IgniteInClosure<? super IgniteInternalFuture<R>>)waiter);
+    }
 
-        notifyListener(lsnr0);
+    /** {@inheritDoc} */
+    @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
+        if (!registerWaiter(lsnr))
+            notifyListener(lsnr);
     }
 
     /** {@inheritDoc} */
@@ -240,27 +364,14 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
     }
 
     /**
-     * Notifies all registered listeners.
+     * @return Logger instance.
      */
-    private void notifyListeners() {
-        IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0;
-
-        synchronized (this) {
-            lsnr0 = lsnr;
-
-            if (lsnr0 == null)
-                return;
-
-            lsnr = null;
-        }
-
-        assert lsnr0 != null;
-
-        notifyListener(lsnr0);
+    @Nullable public IgniteLogger logger() {
+        return null;
     }
 
     /**
-     * Notifies single listener.
+     * Notifies listener.
      *
      * @param lsnr Listener.
      */
@@ -293,22 +404,29 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
 
     /** {@inheritDoc} */
     @Override public boolean isDone() {
-        // Don't check for "valid" here, as "done" flag can be read
-        // even in invalid state.
-        return endTime != 0;
+        return isDone(state);
+    }
+
+    /**
+     * @param state State to check.
+     * @return {@code True} if future is done.
+     */
+    private boolean isDone(Object state) {
+        return state == null || state.getClass() != Node.class;
     }
 
     /**
-     * @return Checks is future is completed with exception.
+     * @return {@code True} if future is completed with exception.
      */
     public boolean isFailed() {
-        // Must read endTime first.
-        return endTime != 0 && resFlag == ERR;
+        Object state0 = state;
+
+        return state0 != null && state0.getClass() == ErrorWrapper.class;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isCancelled() {
-        return getState() == CANCELLED;
+        return state == CANCELLED;
     }
 
     /**
@@ -361,32 +479,22 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
      * @param cancel {@code True} if future is being cancelled.
      * @return {@code True} if result was set by this call.
      */
-    private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
-        boolean notify = false;
+    protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+        Object newState = cancel ? CANCELLED : err != null ? new ErrorWrapper(err) : res;
 
-        try {
-            if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
-                if (err != null) {
-                    resFlag = ERR;
-                    this.res = err;
-                }
-                else {
-                    resFlag = RES;
-                    this.res = res;
-                }
+        while (true) {
+            final Object oldState = state;
 
-                notify = true;
+            if (isDone(oldState))
+                return false;
 
-                releaseShared(0);
+            if (compareAndSetState(oldState, newState)) {
+
+                if(oldState != INIT)
+                    unblockAll((Node)oldState);
 
                 return true;
             }
-
-            return false;
-        }
-        finally {
-            if (notify)
-                notifyListeners();
         }
     }
 
@@ -400,75 +508,26 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
     }
 
     /** {@inheritDoc} */
-    @Override protected final int tryAcquireShared(int ignore) {
-        return endTime != 0 ? 1 : -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected final boolean tryReleaseShared(int ignore) {
-        endTime = U.currentTimeMillis();
-
-        // Always signal after setting final done status.
-        return true;
-    }
-
-    /**
-     * @return String representation of state.
-     */
-    private String state() {
-        int s = getState();
-
-        return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
-    }
+    @SuppressWarnings("StringEquality")
+    @Override public String toString() {
+        Object state0 = state;
 
-    /**
-     * @return Logger instance.
-     */
-    @Nullable public IgniteLogger logger() {
-        return null;
-    }
+        String stateStr = stateStr(state0);
+        String resStr = stateStr == DONE ? String.valueOf(state0) : null;
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridFutureAdapter.class, this, "state", state());
+        return S.toString(
+            GridFutureAdapter.class, this,
+            "state", stateStr, false,
+            "res", resStr, true,
+            "hash", System.identityHashCode(this), false);
     }
 
     /**
-     *
+     * @param s State.
+     * @return State string representation.
      */
-    private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
-
-        /**
-         * @param lsnrs Listeners.
-         */
-        private ArrayListener(IgniteInClosure... lsnrs) {
-            this.arr = lsnrs;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void apply(IgniteInternalFuture<R> fut) {
-            for (int i = 0; i < arr.length; i++)
-                arr[i].apply(fut);
-        }
-
-        /**
-         * @param lsnr Listener.
-         */
-        void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
-            arr = Arrays.copyOf(arr, arr.length + 1);
-
-            arr[arr.length - 1] = lsnr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ArrayListener.class, this, "arrSize", arr.length);
-        }
+    private String stateStr(Object s) {
+        return s == CANCELLED ? "CANCELLED" : s != null && s.getClass() == Node.class ? "INIT" : DONE;
     }
 
     /**
@@ -476,22 +535,12 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
      */
     private static class ChainFuture<R, T> extends GridFutureAdapter<T> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private GridFutureAdapter<R> fut;
 
         /** */
         private IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb;
 
         /**
-         *
-         */
-        public ChainFuture() {
-            // No-op.
-        }
-
-        /**
          * @param fut Future.
          * @param doneCb Closure.
          * @param cbExec Optional executor to run callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
index 7d74154..08fae96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
@@ -53,16 +53,6 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
     }
 
     /** {@inheritDoc} */
-    @Override public long startTime() {
-        return fut.startTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long duration() {
-        return fut.duration();
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean isCancelled() {
         return fut.isCancelled();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
index df1ab88..6519ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFuture.java
@@ -95,21 +95,6 @@ public interface IgniteFuture<V> {
     public boolean isDone();
 
     /**
-     * Gets start time for this future.
-     *
-     * @return Start time for this future.
-     */
-    public long startTime();
-
-    /**
-     * Gets duration in milliseconds between start of the future and current time if future
-     * is not finished, or between start and finish of this future.
-     *
-     * @return Time in milliseconds this future has taken to execute.
-     */
-    public long duration();
-
-    /**
      * Registers listener closure to be asynchronously notified whenever future completes.
      *
      * @param lsnr Listener closure to register. If not provided - this method is no-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
index dd710c4..7562fe5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java
@@ -17,17 +17,24 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -44,10 +51,13 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
     private static final int GRID_CNT = 4;
 
     /** First cache name. */
-    public static final String FIRST_CACHE_NAME = "first";
+    private static final String FIRST_CACHE_NAME = "first";
 
     /** Second cache name. */
-    public static final String SECOND_CACHE_NAME = "second";
+    private static final String SECOND_CACHE_NAME = "second";
+
+    /** Grid name attribute. */
+    private static final String GRID_NAME_ATTR = "org.apache.ignite.ignite.name";
 
     /** First cache mode. */
     private CacheMode firstCacheMode;
@@ -55,6 +65,17 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
     /** Second cache mode. */
     private CacheMode secondCacheMode;
 
+    /** Caches rebalance finish times. */
+    private ConcurrentHashMap8<Integer, ConcurrentHashMap8<String, Long>> times;
+
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTestsStarted();
+        times = new ConcurrentHashMap8<>();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            times.put(i, new ConcurrentHashMap8<String, Long>());
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -69,6 +90,17 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(discoSpi);
 
+        Map<IgnitePredicate<? extends Event>, int[]> listeners = new HashMap<>();
+
+        listeners.put(new IgnitePredicate<CacheRebalancingEvent>() {
+            @Override public boolean apply(CacheRebalancingEvent event) {
+                times.get(gridIdx(event)).putIfAbsent(event.cacheName(), event.timestamp());
+                return true;
+            }
+        }, new int[]{EventType.EVT_CACHE_REBALANCE_STOPPED});
+
+        cfg.setLocalEventListeners(listeners);
+
         return cfg;
     }
 
@@ -85,7 +117,6 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
         cfg.setCacheMode(cacheMode);
         cfg.setRebalanceOrder(preloadOrder);
         cfg.setRebalanceMode(ASYNC);
-
         return cfg;
     }
 
@@ -150,11 +181,20 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest {
                 fut1.get();
                 fut2.get();
 
-                assertTrue("[i=" + i + ", fut1=" + fut1 + ", fut2=" + fut2 + ']', fut1.endTime() <= fut2.endTime());
+                long firstSyncTime = times.get(i).get(FIRST_CACHE_NAME);
+                long secondSyncTime = times.get(i).get(SECOND_CACHE_NAME);
+                assertTrue(
+                    FIRST_CACHE_NAME + " [syncTime=" + firstSyncTime + "], "
+                        + SECOND_CACHE_NAME + " [syncTime=" + secondSyncTime + "]",
+                    firstSyncTime <= secondSyncTime);
             }
         }
         finally {
             stopAllGrids();
         }
     }
+
+    private int gridIdx(Event event) {
+        return getTestIgniteInstanceIndex((String)event.node().attributes().get(GRID_NAME_ATTR));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
index c1cc51e..1e5b9a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteFutureImplTest.java
@@ -44,27 +44,13 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
         assertFalse(fut.isDone());
 
-        assertTrue(fut.startTime() > 0);
-
         U.sleep(100);
 
-        assertTrue(fut.duration() > 0);
-
         fut0.onDone("test");
 
         assertEquals("test", fut.get());
 
         assertTrue(fut.isDone());
-
-        assertTrue(fut.duration() > 0);
-
-        long dur0 = fut.duration();
-
-        U.sleep(100);
-
-        assertEquals(dur0, fut.duration());
-
-        assertEquals("test", fut.get());
     }
 
     /**
@@ -77,12 +63,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
         assertFalse(fut.isDone());
 
-        assertTrue(fut.startTime() > 0);
-
         U.sleep(100);
 
-        assertTrue(fut.duration() > 0);
-
         IgniteCheckedException err0 = new IgniteCheckedException("test error");
 
         fut0.onDone(err0);
@@ -99,14 +81,6 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
         assertTrue(fut.isDone());
 
-        assertTrue(fut.duration() > 0);
-
-        long dur0 = fut.duration();
-
-        U.sleep(100);
-
-        assertEquals(dur0, fut.duration());
-
         err = (IgniteException)GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
                 fut.get();
@@ -268,12 +242,8 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
         assertFalse(chained.isDone());
 
-        assertTrue(chained.startTime() > 0);
-
         U.sleep(100);
 
-        assertTrue(chained.duration() > 0);
-
         final AtomicInteger lsnrCnt = new AtomicInteger();
 
         chained.listen(new CI1<IgniteFuture<Integer>>() {
@@ -288,14 +258,6 @@ public class IgniteFutureImplTest extends GridCommonAbstractTest {
 
         assertTrue(chained.isDone());
 
-        assertTrue(chained.duration() > 0);
-
-        long dur0 = chained.duration();
-
-        U.sleep(100);
-
-        assertEquals(dur0, chained.duration());
-
         assertEquals(10, (int)chained.get());
 
         assertEquals(1, lsnrCnt.get());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
index c8263ff..6c338c8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -964,7 +964,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
                 if (err == null) {
                     if (log.isDebugEnabled())
                         log.debug("Initialized child process for external task execution [jobId=" + jobId +
-                            ", desc=" + desc + ", initTime=" + duration() + ']');
+                            ", desc=" + desc + ']');
                 }
                 else
                     U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
index 71bbb84..bc3d40d 100644
--- a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
+++ b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
@@ -404,16 +404,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
     }
 
     /** {@inheritDoc} */
-    @Override public long startTime() {
-        return stats.getCreateTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long duration() {
-        return stats.getTotalExecutionTime() + stats.getTotalIdleTime();
-    }
-
-    /** {@inheritDoc} */
     @Override public String pattern() {
         return pat;
     }
@@ -776,16 +766,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
         }
 
         /** {@inheritDoc} */
-        @Override public long startTime() {
-            return ref.startTime();
-        }
-
-        /** {@inheritDoc} */
-        @Override public long duration() {
-            return ref.duration();
-        }
-
-        /** {@inheritDoc} */
         @Override public String id() {
             return ref.id();
         }


[2/2] ignite git commit: ignite-4681 Apply new future adapter

Posted by yz...@apache.org.
ignite-4681 Apply new future adapter


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e922dda6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e922dda6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e922dda6

Branch: refs/heads/master
Commit: e922dda6e9230ff7715f83c7b81e5656e8e856a0
Parents: dd4a5c4
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Thu Apr 13 15:41:54 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Apr 13 15:42:50 2017 +0300

----------------------------------------------------------------------
 .../jmh/future/JmhFutureAdapterBenchmark.java   | 145 ++++++
 .../ignite/internal/IgniteInternalFuture.java   |  15 -
 .../cache/GridCacheCompoundFuture.java          |  63 +++
 .../cache/GridCacheCompoundIdentityFuture.java  |  63 +++
 .../processors/cache/GridCacheFuture.java       |  15 +
 .../cache/GridCacheFutureAdapter.java           |  61 +++
 .../distributed/GridCacheTxRecoveryFuture.java  |   9 +-
 .../dht/CacheDistributedGetFutureAdapter.java   |   4 +-
 .../distributed/dht/GridDhtLockFuture.java      |  33 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  16 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   4 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |   4 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |   8 +-
 .../GridNearAtomicSingleUpdateFuture.java       |  24 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  24 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  23 +-
 .../GridDhtPartitionsExchangeFuture.java        |  35 +-
 .../distributed/near/GridNearLockFuture.java    |  20 +-
 ...arOptimisticSerializableTxPrepareFuture.java |   2 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   9 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   5 +-
 .../near/GridNearTxFinishFuture.java            |  13 +-
 .../cache/distributed/near/GridNearTxLocal.java |   2 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |   4 +-
 .../cache/local/GridLocalLockFuture.java        |   6 +-
 .../query/GridCacheDistributedQueryFuture.java  |  18 +-
 .../query/GridCacheQueryFutureAdapter.java      |  31 +-
 .../cache/transactions/TxDeadlockDetection.java |   5 +-
 .../platform/compute/PlatformCompute.java       |  10 -
 .../tcp/GridTcpMemcachedNioListener.java        |  20 +-
 .../util/future/GridCompoundFuture.java         |  45 +-
 .../util/future/GridFinishedFuture.java         |  13 -
 .../internal/util/future/GridFutureAdapter.java | 479 ++++++++++---------
 .../internal/util/future/IgniteFutureImpl.java  |  10 -
 .../org/apache/ignite/lang/IgniteFuture.java    |  15 -
 .../GridCacheOrderedPreloadingSelfTest.java     |  48 +-
 .../util/future/IgniteFutureImplTest.java       |  38 --
 .../external/HadoopExternalTaskExecutor.java    |   2 +-
 .../processors/schedule/ScheduleFutureImpl.java |  20 -
 40 files changed, 810 insertions(+), 555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
new file mode 100644
index 0000000..ef3643a
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.future;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+/**
+ *
+ */
+public class JmhFutureAdapterBenchmark extends JmhAbstractBenchmark {
+    /** */
+    private static final IgniteInClosure<IgniteInternalFuture<Long>> LSNR = new IgniteInClosure<IgniteInternalFuture<Long>>() {
+        /** {@inheritDoc} */
+        @Override public void apply(IgniteInternalFuture<Long> fut) {
+            // No-op
+        }
+    };
+
+    /** */
+    private static final Long RES = 0L;
+
+    /**
+     *
+     */
+    @State(Scope.Thread)
+    public static class CompleteState {
+        /** */
+        private final BlockingQueue<GridFutureAdapter<Long>> queue = new ArrayBlockingQueue<>(10);
+
+        /** */
+        private final Thread compleete = new Thread() {
+
+            /** {@inheritDoc} */
+            @Override public void run() {
+                while (!Thread.interrupted()) {
+                    GridFutureAdapter<Long> fut = queue.poll();
+                    if (fut != null)
+                        fut.onDone(RES);
+                }
+            }
+        };
+
+        /**
+         *
+         */
+        @Setup public void setup() {
+            compleete.start();
+        }
+
+        /**
+         * @throws InterruptedException If failed.
+         */
+        @TearDown public void destroy() throws InterruptedException {
+            compleete.interrupt();
+            compleete.join();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Benchmark
+    public void testSimpleGet() throws Exception {
+        GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+        fut.onDone(RES);
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Benchmark
+    public void testSimpleGetWithListener() throws Exception {
+        GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+        fut.listen(LSNR);
+        fut.onDone(RES);
+        fut.get();
+    }
+
+    /**
+     * @param state Benchmark context.
+     * @throws Exception If failed.
+     */
+    @Benchmark
+    @Threads(4)
+    public void completeFutureGet(CompleteState state) throws Exception {
+        GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+        state.queue.put(fut);
+        fut.get();
+    }
+
+    /**
+     * Run benchmarks.
+     *
+     * @param args Arguments.
+     * @throws Exception If failed.
+     */
+    public static void main(String[] args) throws Exception {
+        run(8);
+    }
+
+    /**
+     * Run benchmark.
+     *
+     * @param threads Amount of threads.
+     * @throws Exception If failed.
+     */
+    private static void run(int threads) throws Exception {
+        JmhIdeBenchmarkRunner.create()
+            .forks(1)
+            .threads(threads)
+            .warmupIterations(30)
+            .measurementIterations(30)
+            .benchmarks(JmhFutureAdapterBenchmark.class.getSimpleName())
+            .jvmArguments("-Xms4g", "-Xmx4g")
+            .run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index 789556d..76f8c71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -103,21 +103,6 @@ public interface IgniteInternalFuture<R> {
     public boolean isCancelled();
 
     /**
-     * Gets start time for this future.
-     *
-     * @return Start time for this future.
-     */
-    public long startTime();
-
-    /**
-     * Gets duration in milliseconds between start of the future and current time if future
-     * is not finished, or between start and finish of this future.
-     *
-     * @return Time in milliseconds this future has taken to execute.
-     */
-    public long duration();
-
-    /**
      * Registers listener closure to be asynchronously notified whenever future completes.
      *
      * @param lsnr Listener closure to register. If not provided - this method is no-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
new file mode 100644
index 0000000..9869d4a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheCompoundFuture<T, R> extends GridCompoundFuture<T, R> implements GridCacheFuture<R> {
+    /** Future start time. */
+    private final long startTime = U.currentTimeMillis();
+
+    /** Future end time. */
+    private volatile long endTime;
+
+    /**
+     * @param rdc Reducer.
+     */
+    protected GridCacheCompoundFuture(@Nullable IgniteReducer<T, R> rdc) {
+        super(rdc);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long startTime() {
+        return startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long duration() {
+        long endTime = this.endTime;
+
+        return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+        if(super.onDone(res, err, cancel)){
+            endTime = U.currentTimeMillis();
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
new file mode 100644
index 0000000..8fd619a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheCompoundIdentityFuture<T> extends GridCompoundIdentityFuture<T> implements GridCacheFuture<T> {
+    /** Future start time. */
+    private final long startTime = U.currentTimeMillis();
+
+    /** Future end time. */
+    private volatile long endTime;
+
+    /**
+     * @param rdc Reducer.
+     */
+    protected GridCacheCompoundIdentityFuture(@Nullable IgniteReducer<T, T> rdc) {
+        super(rdc);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long startTime() {
+        return startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long duration() {
+        long endTime = this.endTime;
+
+        return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancel) {
+        if(super.onDone(res, err, cancel)){
+            endTime = U.currentTimeMillis();
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
index 8bf8d40..90a219a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
@@ -26,6 +26,21 @@ import org.apache.ignite.lang.IgniteUuid;
  */
 public interface GridCacheFuture<R> extends IgniteInternalFuture<R> {
     /**
+     * Gets start time for this future.
+     *
+     * @return Start time for this future.
+     */
+    public long startTime();
+
+    /**
+     * Gets duration in milliseconds between start of the future and current time if future
+     * is not finished, or between start and finish of this future.
+     *
+     * @return Time in milliseconds this future has taken to execute.
+     */
+    public long duration();
+
+    /**
      * @return Unique identifier for this future.
      */
     public IgniteUuid futureId();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
new file mode 100644
index 0000000..babd707
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheFutureAdapter<R> extends GridFutureAdapter<R> implements GridCacheFuture<R> {
+    /** Future start time. */
+    private final long startTime = U.currentTimeMillis();
+
+    /** Future end time. */
+    private volatile long endTime;
+
+    /**
+     * Default constructor.
+     */
+    public GridCacheFutureAdapter() {
+    }
+
+    /** {@inheritDoc} */
+    @Override public long startTime() {
+        return startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long duration() {
+        long endTime = this.endTime;
+
+        return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+        if(super.onDone(res, err, cancel)){
+            endTime = U.currentTimeMillis();
+            return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index e27f777..1c97de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -28,11 +28,11 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -49,7 +49,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
 /**
  * Future verifying that all remote transactions related to transaction were prepared or committed.
  */
-public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<Boolean> {
     /** */         
     private static final long serialVersionUID = 0L;
     
@@ -426,7 +426,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private MiniFuture miniFuture(IgniteUuid miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -547,9 +547,6 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
      *
      */
     private class MiniFuture extends GridFutureAdapter<Boolean> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
         /** Mini future ID. */
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 4381dfd..259b096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -25,11 +25,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteUuid;
@@ -42,7 +42,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  *
  */
-public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
+public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCompoundIdentityFuture<Map<K, V>>
     implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
     /** Default max remap count value. */
     public static final int DFLT_MAX_REMAP_CNT = 3;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a0270b0..1a7c2c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -77,7 +77,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
 /**
  * Cache lock future.
  */
-public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
     /** */
     private static final long serialVersionUID = 0L;
@@ -298,10 +298,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     /**
      * @return Entries.
      */
-    public Collection<GridDhtCacheEntry> entriesCopy() {
-        synchronized (sync) {
-            return new ArrayList<>(entries());
-        }
+    public synchronized Collection<GridDhtCacheEntry> entriesCopy() {
+        return new ArrayList<>(entries());
     }
 
     /**
@@ -395,7 +393,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             return null;
         }
 
-        synchronized (sync) {
+        synchronized (this) {
             entries.add(c == null || c.reentry() ? null : entry);
 
             if (c != null && !c.reentry())
@@ -529,7 +527,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private MiniFuture miniFuture(IgniteUuid miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -599,7 +597,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      * @param t Error.
      */
     public void onError(Throwable t) {
-        synchronized (sync) {
+        synchronized (this) {
             if (err != null)
                 return;
 
@@ -646,7 +644,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]");
 
         if (owner != null && owner.version().equals(lockVer)) {
-            synchronized (sync) {
+            synchronized (this) {
                 if (!pendingLocks.remove(entry.key()))
                     return false;
             }
@@ -663,10 +661,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     /**
      * @return {@code True} if locks have been acquired.
      */
-    private boolean checkLocks() {
-        synchronized (sync) {
-            return pendingLocks.isEmpty();
-        }
+    private synchronized boolean checkLocks() {
+        return pendingLocks.isEmpty();
     }
 
     /** {@inheritDoc} */
@@ -697,7 +693,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         if (isDone() || (err == null && success && !checkLocks()))
             return false;
 
-        synchronized (sync) {
+        synchronized (this) {
             if (this.err == null)
                 this.err = err;
         }
@@ -776,7 +772,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      * @param entries Entries.
      */
     private void map(Iterable<GridDhtCacheEntry> entries) {
-        synchronized (sync) {
+        synchronized (this) {
             if (mapped)
                 return;
 
@@ -1120,7 +1116,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             if (log.isDebugEnabled())
                 log.debug("Timed out waiting for lock response: " + this);
 
-            synchronized (sync) {
+            synchronized (GridDhtLockFuture.this) {
                 timedOut = true;
 
                 // Stop locks and responses processing.
@@ -1146,9 +1142,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      */
     private class MiniFuture extends GridFutureAdapter<Boolean> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
         /** Node. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 17e9047..23d7657 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -28,12 +28,12 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -50,7 +50,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 /**
  *
  */
-public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
     implements GridCacheFuture<IgniteInternalTx> {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 93ea30d..964d423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
 import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -97,7 +98,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
  *
  */
 @SuppressWarnings("unchecked")
-public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
+public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
     implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -279,7 +280,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         boolean rmv;
 
-        synchronized (sync) {
+        synchronized (this) {
             rmv = lockKeys.remove(entry.txKey());
         }
 
@@ -310,7 +311,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (!locksReady)
             return false;
 
-        synchronized (sync) {
+        synchronized (this) {
             return lockKeys.isEmpty();
         }
     }
@@ -564,7 +565,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -623,7 +624,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             }
 
             if (tx.optimistic() && txEntry.explicitVersion() == null) {
-                synchronized (sync) {
+                synchronized (this) {
                     lockKeys.add(txEntry.txKey());
                 }
             }
@@ -1597,9 +1598,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      */
     private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private final int futId;
 
         /** Node ID. */
@@ -1811,7 +1809,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         /** {@inheritDoc} */
         @Override public void onTimeout() {
-            synchronized (sync) {
+            synchronized (GridDhtTxPrepareFuture.this) {
                 clear();
 
                 lockKeys.clear();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 47f4066..3af691c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
@@ -61,7 +61,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  *
  */
-public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>,
+public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>,
     CacheGetFuture {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 0940acb..039cb99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -36,12 +36,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -58,7 +58,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
 /**
  * DHT atomic cache backup update future.
  */
-public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapter<Void>
+public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureAdapter<Void>
     implements GridCacheAtomicFuture<Void> {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index a2adb05..122e17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -58,7 +59,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
 /**
  * Base for near atomic update futures.
  */
-public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapter<Object>
+public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFutureAdapter<Object>
     implements GridCacheAtomicFuture<Object> {
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -114,9 +115,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /** Near cache flag. */
     protected final boolean nearEnabled;
 
-    /** Mutex to synchronize state updates. */
-    protected final Object mux = new Object();
-
     /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
     protected boolean topLocked;
 
@@ -138,7 +136,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
     /** Future ID. */
     @GridToStringInclude
-    protected long futId;
+    protected volatile long futId;
 
     /** Operation result. */
     protected GridCacheReturn opRes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index e4ba457..11c3336 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -126,9 +126,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override public long id() {
-        synchronized (mux) {
-            return futId;
-        }
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -141,7 +139,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         boolean rcvAll = false;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (reqState == null)
                 return false;
 
@@ -215,7 +213,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         CachePartialUpdateCheckedException err0;
         AffinityTopologyVersion remapTopVer0;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (futId == 0 || futId != res.futureId())
                 return;
 
@@ -257,7 +255,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (futId == 0 || futId != res.futureId())
                 return;
 
@@ -331,7 +329,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return Non-null topology version if update should be remapped.
      */
     private AffinityTopologyVersion onAllReceived() {
-        assert Thread.holdsLock(mux);
+        assert Thread.holdsLock(this);
         assert futId > 0;
 
         AffinityTopologyVersion remapTopVer0 = null;
@@ -488,7 +486,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         try {
             reqState0 = mapSingleUpdate(topVer, futId);
 
-            synchronized (mux) {
+            synchronized (this) {
                 assert this.futId == 0 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
@@ -537,7 +535,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
         GridNearAtomicCheckUpdateRequest checkReq = null;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (this.futId == 0 || this.futId != futId)
                 return;
 
@@ -568,7 +566,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private Long onFutureDone() {
         Long id0;
 
-        synchronized (mux) {
+        synchronized (this) {
             id0 = futId;
 
             futId = 0;
@@ -734,9 +732,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     }
 
     /** {@inheritDoc} */
-    public String toString() {
-        synchronized (mux) {
-            return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
-        }
+    public synchronized String toString() {
+        return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 84deefc..6198de4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -151,9 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** {@inheritDoc} */
     @Override public long id() {
-        synchronized (mux) {
-            return futId;
-        }
+        return futId;
     }
 
     /** {@inheritDoc} */
@@ -166,7 +164,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (futId == 0)
                 return false;
 
@@ -299,7 +297,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         CachePartialUpdateCheckedException err0;
         AffinityTopologyVersion remapTopVer0;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (futId == 0 || futId != res.futureId())
                 return;
 
@@ -372,7 +370,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         boolean rcvAll;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (futId == 0 || futId != res.futureId())
                 return;
 
@@ -534,7 +532,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @return Non null topology version if update should be remapped.
      */
     @Nullable private AffinityTopologyVersion onAllReceived() {
-        assert Thread.holdsLock(mux);
+        assert Thread.holdsLock(this);
         assert futId > 0;
 
         AffinityTopologyVersion remapTopVer0 = null;
@@ -801,7 +799,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 }
             }
 
-            synchronized (mux) {
+            synchronized (this) {
                 assert this.futId == 0 : this;
                 assert this.topVer == AffinityTopologyVersion.ZERO : this;
 
@@ -866,7 +864,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         boolean rcvAll = false;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (this.futId == 0 || this.futId != futId)
                 return;
 
@@ -938,7 +936,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     private Long onFutureDone() {
         Long id0;
 
-        synchronized (mux) {
+        synchronized (this) {
             id0 = futId;
 
             futId = 0;
@@ -1181,9 +1179,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
-    public String toString() {
-        synchronized (mux) {
-            return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
-        }
+    public synchronized String toString() {
+        return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 79c15fb..8512298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -82,7 +82,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 /**
  * Colocated cache lock future.
  */
-public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -203,7 +203,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         this.skipStore = skipStore;
         this.keepBinary = keepBinary;
 
-        ignoreInterrupts(true);
+        ignoreInterrupts();
 
         threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
 
@@ -452,13 +452,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     /**
      * @return Keys for which locks requested from remote nodes but response isn't received.
      */
-    public Set<IgniteTxKey> requestedKeys() {
-        synchronized (sync) {
-            if (timeoutObj != null && timeoutObj.requestedKeys != null)
-                return timeoutObj.requestedKeys;
+    public synchronized Set<IgniteTxKey> requestedKeys() {
+        if (timeoutObj != null && timeoutObj.requestedKeys != null)
+            return timeoutObj.requestedKeys;
 
-            return requestedKeys0();
-        }
+        return requestedKeys0();
     }
 
     /**
@@ -490,7 +488,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -1341,7 +1339,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                 log.debug("Timed out waiting for lock response: " + this);
 
             if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
-                synchronized (sync) {
+                synchronized (GridDhtColocatedLockFuture.this) {
                     requestedKeys = requestedKeys0();
 
                     clear(); // Stop response processing.
@@ -1390,9 +1388,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
      */
     private class MiniFuture extends GridFutureAdapter<Boolean> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private final int futId;
 
         /** Node ID. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f41da2b..55aca2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -171,9 +171,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     @GridToStringInclude
     private volatile IgniteInternalFuture<?> partReleaseFut;
 
-    /** */
-    private final Object mux = new Object();
-
     /** Logger. */
     private IgniteLogger log;
 
@@ -1087,7 +1084,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (super.onDone(res, err) && realExchange) {
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
-                    "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+                    ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
 
             initFut.onDone(err == null);
 
@@ -1201,7 +1198,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         boolean allReceived = false;
         boolean updateSingleMap = false;
 
-        synchronized (mux) {
+        synchronized (this) {
             assert crd != null;
 
             if (crd.isLocal()) {
@@ -1222,13 +1219,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 updatePartitionSingleMap(msg);
             }
             finally {
-                synchronized (mux) {
+                synchronized (this) {
                     assert pendingSingleUpdates > 0;
 
                     pendingSingleUpdates--;
 
                     if (pendingSingleUpdates == 0)
-                        mux.notifyAll();
+                        notifyAll();
                 }
             }
         }
@@ -1243,15 +1240,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /**
      *
      */
-    private void awaitSingleMapUpdates() {
-        synchronized (mux) {
-            try {
-                while (pendingSingleUpdates > 0)
-                    U.wait(mux);
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
-            }
+    private synchronized void awaitSingleMapUpdates() {
+        try {
+            while (pendingSingleUpdates > 0)
+                U.wait(this);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
         }
     }
 
@@ -1316,7 +1311,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             else {
                 List<ClusterNode> nodes;
 
-                synchronized (mux) {
+                synchronized (this) {
                     srvNodes.remove(cctx.localNode());
 
                     nodes = new ArrayList<>(srvNodes);
@@ -1423,7 +1418,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert msg.exchangeId().equals(exchId) : msg;
         assert msg.lastVersion() != null : msg;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (crd == null)
                 return;
 
@@ -1605,7 +1600,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                         discoCache.updateAlives(node);
 
-                        synchronized (mux) {
+                        synchronized (this) {
                             if (!srvNodes.remove(node))
                                 return;
 
@@ -1755,7 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         Set<UUID> remaining;
         List<ClusterNode> srvNodes;
 
-        synchronized (mux) {
+        synchronized (this) {
             remaining = new HashSet<>(this.remaining);
             srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) : null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 1948df0..8de01c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -80,7 +80,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 /**
  * Cache lock future.
  */
-public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -209,7 +209,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         this.skipStore = skipStore;
         this.keepBinary = keepBinary;
 
-        ignoreInterrupts(true);
+        ignoreInterrupts();
 
         threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
 
@@ -499,13 +499,11 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     /**
      * @return Keys for which locks requested from remote nodes but response isn't received.
      */
-    public Set<IgniteTxKey> requestedKeys() {
-        synchronized (sync) {
-            if (timeoutObj != null && timeoutObj.requestedKeys != null)
-                return timeoutObj.requestedKeys;
+    public synchronized Set<IgniteTxKey> requestedKeys() {
+        if (timeoutObj != null && timeoutObj.requestedKeys != null)
+            return timeoutObj.requestedKeys;
 
-            return requestedKeys0();
-        }
+        return requestedKeys0();
     }
 
     /**
@@ -537,7 +535,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -1440,7 +1438,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             timedOut = true;
 
             if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
-                synchronized (sync) {
+                synchronized (GridNearLockFuture.this) {
                     requestedKeys = requestedKeys0();
 
                     clear(); // Stop response processing.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 80508dc..cbd9d23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -227,7 +227,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 6189b38..81179ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -206,7 +206,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public Set<IgniteTxKey> requestedKeys() {
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             for (int i = 0; i < size; i++) {
@@ -239,7 +239,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -694,7 +694,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             if (keyLockFut != null)
                 keys = new HashSet<>(keyLockFut.lockKeys);
             else {
-                synchronized (sync) {
+                synchronized (this) {
                     int size = futuresCountNoLock();
 
                     for (int i = 0; i < size; i++) {
@@ -765,9 +765,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      *
      */
     private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
         /** Receive result flag updater. */
         private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
             AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 4a443a9..cb15bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -132,7 +132,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     @SuppressWarnings("ForLoopReplaceableByForEach")
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (sync) {
+        synchronized (this) {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -365,9 +365,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
      */
     private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private final int futId;
 
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 1b0566b..37be0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -46,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -66,7 +66,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 /**
  *
  */
-public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
     implements GridCacheFuture<IgniteInternalTx> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -114,7 +114,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         this.tx = tx;
         this.commit = commit;
 
-        ignoreInterrupts(true);
+        ignoreInterrupts();
 
         mappings = tx.mappings();
 
@@ -189,7 +189,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         if (!isDone()) {
             FinishMiniFuture finishFut = null;
 
-            synchronized (sync) {
+            synchronized (this) {
                 int size = futuresCountNoLock();
 
                 for (int i = 0; i < size; i++) {
@@ -878,9 +878,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      *
      */
     private class FinishMiniFuture extends MinFuture {
-        /** */
-        private static final long serialVersionUID = 0L;
-
         /** Keys. */
         @GridToStringInclude
         private GridDistributedTxMapping m;
@@ -926,7 +923,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         if (!F.isEmpty(backups)) {
                             final CheckRemoteTxMiniFuture mini;
 
-                            synchronized (sync) {
+                            synchronized (GridNearTxFinishFuture.this) {
                                 int futId = Integer.MIN_VALUE + futuresCountNoLock();
 
                                 mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8be87d4..5baec99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3946,7 +3946,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     private <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T> fut) {
         // Safety.
         if (fut instanceof GridFutureAdapter)
-            ((GridFutureAdapter)fut).ignoreInterrupts(true);
+            ((GridFutureAdapter)fut).ignoreInterrupts();
 
         return fut;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index f9a6353..7f1f5a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
@@ -35,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -49,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
  * Common code for tx prepare in optimistic and pessimistic modes.
  */
 public abstract class GridNearTxPrepareFutureAdapter extends
-    GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> {
+    GridCacheCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> {
     /** Logger reference. */
     protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 8e224c8..d8e95b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -41,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.transactions.TransactionDeadlockException;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -53,7 +53,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Cache lock future.
  */
-public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
+public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Boolean>
     implements GridCacheMvccFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -135,7 +135,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
         this.filter = filter;
         this.tx = tx;
 
-        ignoreInterrupts(true);
+        ignoreInterrupts();
 
         threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 6110e0c..4c8e34f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -71,7 +71,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
 
         assert mgr != null;
 
-        synchronized (mux) {
+        synchronized (this) {
             for (ClusterNode node : nodes)
                 subgrid.add(node.id());
         }
@@ -87,7 +87,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
             Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
             Collection<ClusterNode> nodes;
 
-            synchronized (mux) {
+            synchronized (this) {
                 nodes = F.retain(allNodes, true,
                     new P1<ClusterNode>() {
                         @Override public boolean apply(ClusterNode node) {
@@ -139,7 +139,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
     @Override protected void onNodeLeft(UUID nodeId) {
         boolean callOnPage;
 
-        synchronized (mux) {
+        synchronized (this) {
             callOnPage = !loc && subgrid.contains(nodeId);
         }
 
@@ -166,7 +166,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
 
     /** {@inheritDoc} */
     @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(mux);
+        assert Thread.holdsLock(this);
 
         if (!loc) {
             rcvd.add(nodeId);
@@ -192,11 +192,11 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
     /** {@inheritDoc} */
     @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
     @Override protected void loadPage() {
-        assert !Thread.holdsLock(mux);
+        assert !Thread.holdsLock(this);
 
         Collection<ClusterNode> nodes = null;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (!isDone() && rcvd.containsAll(subgrid)) {
                 rcvd.clear();
 
@@ -211,13 +211,13 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
     /** {@inheritDoc} */
     @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
     @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(mux);
+        assert !Thread.holdsLock(this);
 
         U.await(firstPageLatch);
 
         Collection<ClusterNode> nodes = null;
 
-        synchronized (mux) {
+        synchronized (this) {
             if (!isDone() && !subgrid.isEmpty())
                 nodes = nodes();
         }
@@ -230,7 +230,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
      * @return Nodes to send requests to.
      */
     private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(mux);
+        assert Thread.holdsLock(this);
 
         Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());