You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/29 15:00:57 UTC

[2/2] ignite git commit: ignite-1.5 debug

ignite-1.5 debug


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad658eb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad658eb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad658eb2

Branch: refs/heads/ignite-1537
Commit: ad658eb2d58c83a71e41d70eebf1aee0b4aba2ff
Parents: 474e394
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 29 17:00:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 29 17:00:49 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedTxFinishRequest.java         |   4 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  27 +-
 .../near/GridNearTxFinishFuture.java            | 384 +++++++++++++------
 3 files changed, 295 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ad658eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 34b3112..692c70c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.lang.IgniteUuid;
@@ -85,6 +83,8 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
      * @param invalidate Invalidate flag.
      * @param sys System transaction flag.
      * @param plc IO policy.
+     * @param syncCommit Sync commit flag.
+     * @param syncRollback Sync rollback flag.
      * @param baseVer Base version.
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad658eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 65f1cb4..74325e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.UUID;
 import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -44,6 +44,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
     /** Near node ID. */
     private UUID nearNodeId;
 
@@ -64,7 +67,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     @GridDirectCollection(GridCacheVersion.class)
     private Collection<GridCacheVersion> pendingVers;
 
-    /** Check comitted flag. */
+    /** Check committed flag. */
     private boolean checkCommitted;
 
     /** Partition update counter. */
@@ -81,6 +84,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Task name hash. */
     private int taskNameHash;
 
+    /** */
+    private byte flags;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -100,6 +106,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
      * @param sys System flag.
+     * @param plc IO policy.
      * @param sysInvalidate System invalidation flag.
      * @param syncCommit Synchronous commit flag.
      * @param syncRollback Synchronous rollback flag.
@@ -180,6 +187,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
      * @param sys System flag.
+     * @param plc IO policy.
      * @param sysInvalidate System invalidation flag.
      * @param syncCommit Synchronous commit flag.
      * @param syncRollback Synchronous rollback flag.
@@ -302,20 +310,17 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
-     * Gets versions of not acquired locks with version less then one of transaction being committed.
-     *
-     * @return Versions of locks for entries participating in transaction that have not been acquired yet
-     *      have version less then one of transaction being committed.
+     * @return Check committed flag.
      */
-    public Collection<GridCacheVersion> pendingVersions() {
-        return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
+    public boolean checkCommitted() {
+        return checkCommitted;
     }
 
     /**
-     * @return Check committed flag.
+     * @return {@code True}
      */
-    public boolean checkCommitted() {
-        return checkCommitted;
+    public boolean waitRemoteTxs() {
+        return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad658eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 00f5141..f225d6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
@@ -65,6 +68,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
 
     /** */
+    public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
@@ -123,22 +129,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteInternalFuture<?> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
+        boolean found = false;
 
-                if (f.node().id().equals(nodeId)) {
-                    // Remove previous mapping.
-                    mappings.remove(nodeId);
+        ClusterTopologyCheckedException err =
+            new ClusterTopologyCheckedException("Remote node left grid: " + nodeId);
 
-                    f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will fail): " + nodeId));
+        for (IgniteInternalFuture<?> fut : futures())
+            if (isMini(fut)) {
+                MinFuture f = (MinFuture)fut;
 
-                    return true;
-                }
+                if (f.onNodeLeft(nodeId, err))
+                    found = true;
             }
 
-        return false;
+        return found;
     }
 
     /** {@inheritDoc} */
@@ -160,13 +166,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
         if (!isDone())
             for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+                if (fut.getClass() == FinishMiniFuture.class) {
+                    FinishMiniFuture f = (FinishMiniFuture)fut;
 
                     if (f.futureId().equals(res.miniId())) {
                         assert f.node().id().equals(nodeId);
 
-                        f.onResult(res);
+                        f.onNearFinishResponse(res);
                     }
                 }
             }
@@ -179,15 +185,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
         if (!isDone())
             for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+                if (fut.getClass() == CheckBackupMiniFuture.class) {
+                    CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
 
                     if (f.futureId().equals(res.miniId())) {
                         assert f.node().id().equals(nodeId);
 
-                        f.onResult(res);
+                        f.onDhtFinishResponse(res);
                     }
                 }
+                else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+                    CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId()))
+                        f.onDhtFinishResponse(nodeId);
+                }
             }
     }
 
@@ -290,11 +302,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     * @param f Future.
+     * @param fut Future.
      * @return {@code True} if mini-future.
      */
-    private boolean isMini(IgniteInternalFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
+    private boolean isMini(IgniteInternalFuture<?> fut) {
+        return fut.getClass() == FinishMiniFuture.class ||
+            fut.getClass() == CheckBackupMiniFuture.class ||
+            fut.getClass() == CheckRemoteTxMiniFuture.class;
     }
 
     /**
@@ -394,7 +408,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                 ClusterNode backup = cctx.discovery().node(backupId);
 
-                final MiniFuture mini = new MiniFuture(backup, mapping);
+                final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
 
                 add(mini);
 
@@ -445,32 +459,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                     }
                 }
                 else {
-                    GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
-                        cctx.localNodeId(),
-                        futureId(),
-                        mini.futureId(),
-                        tx.topologyVersion(),
-                        tx.xidVersion(),
-                        tx.commitVersion(),
-                        tx.threadId(),
-                        tx.isolation(),
-                        true,
-                        false,
-                        tx.system(),
-                        tx.ioPolicy(),
-                        false,
-                        tx.syncCommit(),
-                        tx.syncRollback(),
-                        null,
-                        null,
-                        null,
-                        null,
-                        0,
-                        null,
-                        0,
-                        tx.activeCachesDeploymentEnabled());
-
-                    finishReq.checkCommitted(true);
+                    GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
 
                     try {
                         if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0)
@@ -482,10 +471,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         }
                     }
                     catch (ClusterTopologyCheckedException e) {
-                        mini.onResult(e);
+                        mini.onNodeLeft(backupId, e);
                     }
                     catch (IgniteCheckedException e) {
-                        mini.onResult(e);
+                        mini.onDone(e);
                     }
                 }
             }
@@ -603,7 +592,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 add(fut);
         }
         else {
-            MiniFuture fut = new MiniFuture(m);
+            FinishMiniFuture fut = new FinishMiniFuture(m);
 
             req.miniId(fut.futureId());
 
@@ -623,11 +612,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 // Remove previous mapping.
                 mappings.remove(m.node().id());
 
-                fut.onResult(e);
+                fut.onNodeLeft(n.id(), e);
             }
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
-                fut.onResult(e);
+                fut.onDone(e);
             }
         }
     }
@@ -637,10 +626,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
             @SuppressWarnings("unchecked")
             @Override public String apply(IgniteInternalFuture<?> f) {
-                if (isMini(f)) {
-                    MiniFuture m = (MiniFuture)f;
+                if (f.getClass().equals(FinishMiniFuture.class)) {
+                    FinishMiniFuture fut = (FinishMiniFuture)f;
 
-                    return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+                    return "FinishFuture[node=" + fut.node().id() +
+                        ", loc=" + fut.node().isLocal() +
+                        ", done=" + fut.isDone() + "]";
+                }
+                else if (f.getClass().equals(CheckBackupMiniFuture.class)) {
+                    CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
+
+                    return "CheckBackupFuture[node=" + fut.node().id() +
+                        ", loc=" + fut.node().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+                else if (f.getClass().equals(CheckRemoteTxMiniFuture.class)) {
+                    CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
+
+                    return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
                 }
                 else
                     return "[loc=true, done=" + f.isDone() + "]";
@@ -653,108 +656,219 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
+     * @param miniId Mini future ID.
+     * @return Finish request.
+     */
+    private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+        GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
+            cctx.localNodeId(),
+            futureId(),
+            miniId,
+            tx.topologyVersion(),
+            tx.xidVersion(),
+            tx.commitVersion(),
+            tx.threadId(),
+            tx.isolation(),
+            true,
+            false,
+            tx.system(),
+            tx.ioPolicy(),
+            false,
+            tx.syncCommit(),
+            tx.syncRollback(),
+            null,
+            null,
+            null,
+            null,
+            0,
+            null,
+            0,
+            tx.activeCachesDeploymentEnabled());
+
+        finishReq.checkCommitted(true);
+
+        return finishReq;
+    }
+
+    /**
+     *
+     */
+    private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
+        /** */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /**
+          * @param nodeId Node ID.
+         * @param err Error.
+         * @return {@code True} future processed node failure.
+         */
+        abstract boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException err);
+
+        /**
+         * @return Future ID.
+         */
+        final IgniteUuid futureId() {
+            return futId;
+        }
+    }
+
+    /**
      * Mini-future for get operations. Mini-futures are only waiting on a single
      * node as opposed to multiple nodes.
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class FinishMiniFuture extends MinFuture {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
         /** Keys. */
         @GridToStringInclude
         private GridDistributedTxMapping m;
 
-        /** Backup check flag. */
-        private ClusterNode backup;
-
         /**
          * @param m Mapping.
          */
-        MiniFuture(GridDistributedTxMapping m) {
+        FinishMiniFuture(GridDistributedTxMapping m) {
             this.m = m;
         }
 
         /**
-         * @param backup Backup to check.
-         * @param m Mapping associated with the backup.
+         * @return Node ID.
          */
-        MiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
-            this.backup = backup;
-            this.m = m;
+        ClusterNode node() {
+            return m.node();
         }
 
         /**
-         * @return Future ID.
+         * @return Keys.
          */
-        IgniteUuid futureId() {
-            return futId;
+        public GridDistributedTxMapping mapping() {
+            return m;
         }
 
         /**
-         * @return Node ID.
+         * @param e Node failure.
          */
-        public ClusterNode node() {
-            assert m != null || backup != null;
+        boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException e) {
+            if (nodeId.equals(m.node().id())) {
+                if (log.isDebugEnabled())
+                    log.debug("Remote node left grid while sending or waiting for reply: " + this);
+
+                // Remove previous mapping.
+                mappings.remove(nodeId);
+
+                if (isSync()) {
+                    Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+                    assert txNodes != null : tx;
+
+                    Collection<UUID> backups = txNodes.get(nodeId);
+
+                    if (!F.isEmpty(backups)) {
+                        final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+
+                        add(mini);
+
+                        GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
 
-            return backup != null ? backup : m.node();
+                        for (UUID backupId : backups) {
+                            ClusterNode backup = cctx.discovery().node(backupId);
+
+                            if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+                                if (backup.isLocal()) {
+                                    IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
+
+                                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                        @Override public void apply(IgniteInternalFuture<?> fut) {
+                                            mini.onDhtFinishResponse(cctx.localNodeId());
+                                        }
+                                    });
+                                }
+                                else {
+                                    try {
+                                        cctx.io().send(backup, req, tx.ioPolicy());
+                                    }
+                                    catch (ClusterTopologyCheckedException e0) {
+                                        mini.onNodeLeft(backupId, e0);
+                                    }
+                                    catch (IgniteCheckedException e0) {
+                                        mini.onDone(e0);
+                                    }
+                                }
+                            }
+                            else
+                                mini.onDhtFinishResponse(backupId);
+                        }
+                    }
+                }
+
+                onDone(tx);
+
+                return true;
+            }
+
+            return false;
         }
 
         /**
-         * @return Keys.
+         * @param res Result callback.
          */
-        public GridDistributedTxMapping mapping() {
-            return m;
+        void onNearFinishResponse(GridNearTxFinishResponse res) {
+            if (res.error() != null)
+                onDone(res.error());
+            else
+                onDone(tx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
         }
+    }
+
+    /**
+     *
+     */
+    private class CheckBackupMiniFuture extends MinFuture {
+        /** Keys. */
+        @GridToStringInclude
+        private GridDistributedTxMapping m;
+
+        /** Backup node to check. */
+        private ClusterNode backup;
 
         /**
-         * @param e Error.
+         * @param backup Backup to check.
+         * @param m Mapping associated with the backup.
          */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-            // Fail.
-            onDone(e);
+        CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+            this.backup = backup;
+            this.m = m;
         }
 
         /**
-         * @param e Node failure.
+         * @return Node ID.
          */
-        void onResult(ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+        public ClusterNode node() {
+            return backup;
+        }
 
-            if (backup != null) {
+        /** {@inheritDoc} */
+        @Override boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException e) {
+            if (nodeId.equals(backup.id())) {
                 readyNearMappingFromBackup(m);
 
                 onDone(e);
-            }
-            else
-                // Complete future with tx.
-                onDone(tx);
-        }
 
-        /**
-         * @param res Result callback.
-         */
-        void onResult(GridNearTxFinishResponse res) {
-            assert backup == null;
+                return true;
+            }
 
-            if (res.error() != null)
-                onDone(res.error());
-            else
-                onDone(tx);
+            return false;
         }
 
         /**
          * @param res Response.
          */
-        void onResult(GridDhtTxFinishResponse res) {
-            assert backup != null;
-
+        void onDhtFinishResponse(GridDhtTxFinishResponse res) {
             readyNearMappingFromBackup(m);
 
             Throwable err = res.checkCommittedError();
@@ -774,9 +888,65 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 onDone(tx);
         }
 
+    }
+
+    /**
+     *
+     */
+    private class CheckRemoteTxMiniFuture extends MinFuture {
+        /** */
+        private Set<UUID> nodes;
+
+        /**
+         * @param nodes Backup nodes.
+         */
+        public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+            this.nodes = nodes;
+        }
+
+        Set<UUID> nodes() {
+            synchronized (this) {
+                return new HashSet<>(nodes);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException err) {
+            boolean done;
+
+            boolean ret;
+
+            synchronized (this) {
+                ret = nodes.remove(nodeId);
+
+                done = nodes.isEmpty();
+            }
+
+            if (done)
+                onDone(tx);
+
+            return ret;
+        }
+
+        /**
+         * @param nodeId Node ID.
+         */
+        void onDhtFinishResponse(UUID nodeId) {
+            boolean done;
+
+            synchronized (this) {
+                nodes.remove(nodeId);
+
+                done = nodes.isEmpty();
+            }
+
+            if (done)
+                onDone(tx);
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+            return S.toString(CheckRemoteTxMiniFuture.class, this);
         }
     }
 }