You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/05/06 13:31:30 UTC

[01/19] incubator-ignite git commit: # ignite-157-2

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-855 733f30eee -> 0ccde8d4e


# ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d14a0fb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d14a0fb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d14a0fb4

Branch: refs/heads/ignite-855
Commit: d14a0fb40eca2c6259f2be66bcffbc1c53f988b8
Parents: 8062947
Author: sboikov <sb...@gridgain.com>
Authored: Mon Apr 27 17:03:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Apr 27 17:03:48 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAbstractFailoverSelfTest.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d14a0fb4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 4cb7365..5389ef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -177,11 +177,11 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
                 info("Run topology change.");
 
                 try {
+                    String name = "new-node-" + Thread.currentThread().getName();
+
                     for (int i = 0; i < TOP_CHANGE_CNT && err.get() == null; i++) {
                         info("Topology change " + i);
 
-                        String name = UUID.randomUUID().toString();
-
                         try {
                             final Ignite g = startGrid(name);
 


[12/19] incubator-ignite git commit: # ignite-157-2 renamings

Posted by yz...@apache.org.
# ignite-157-2 renamings


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f5f95fb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f5f95fb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f5f95fb8

Branch: refs/heads/ignite-855
Commit: f5f95fb8c952996f4479852b1ca2e086d3d57621
Parents: b141abf
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 09:56:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 09:56:30 2015 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   4 +-
 .../communication/GridIoMessageFactory.java     |   4 +-
 ...ridCacheOptimisticCheckPreparedTxFuture.java | 508 -------------------
 ...idCacheOptimisticCheckPreparedTxRequest.java | 261 ----------
 ...dCacheOptimisticCheckPreparedTxResponse.java | 179 -------
 .../distributed/GridCacheTxRecoveryFuture.java  | 506 ++++++++++++++++++
 .../distributed/GridCacheTxRecoveryRequest.java | 261 ++++++++++
 .../GridCacheTxRecoveryResponse.java            | 182 +++++++
 .../cache/transactions/IgniteTxHandler.java     |  30 +-
 .../cache/transactions/IgniteTxManager.java     |  98 +---
 .../resources/META-INF/classnames.properties    |   6 +-
 11 files changed, 976 insertions(+), 1063 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index e37b4f3..0540148 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -165,8 +165,8 @@ public class MessageCodeGenerator {
 //        gen.generateAndWrite(GridDhtTxFinishRequest.class);
 //        gen.generateAndWrite(GridDhtTxFinishResponse.class);
 //
-//        gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxRequest.class);
-//        gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxResponse.class);
+//        gen.generateAndWrite(GridCacheTxRecoveryRequest.class);
+//        gen.generateAndWrite(GridCacheTxRecoveryResponse.class);
 
 //        gen.generateAndWrite(GridQueryCancelRequest.class);
 //        gen.generateAndWrite(GridQueryFailResponse.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a395747..7fe8da8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -166,12 +166,12 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 16:
-                msg = new GridCacheOptimisticCheckPreparedTxRequest();
+                msg = new GridCacheTxRecoveryRequest();
 
                 break;
 
             case 17:
-                msg = new GridCacheOptimisticCheckPreparedTxResponse();
+                msg = new GridCacheTxRecoveryResponse();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
deleted file mode 100644
index bd3e1cc..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Future verifying that all remote transactions related to some
- * optimistic transaction were prepared.
- */
-public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
-    implements GridCacheFuture<Boolean> {
-    /** */         
-    private static final long serialVersionUID = 0L;
-    
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Logger. */
-    private static IgniteLogger log;
-
-    /** Trackable flag. */
-    private boolean trackable = true;
-
-    /** Context. */
-    private final GridCacheSharedContext<K, V> cctx;
-
-    /** Future ID. */
-    private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-    /** Transaction. */
-    private final IgniteInternalTx tx;
-
-    /** All involved nodes. */
-    private final Map<UUID, ClusterNode> nodes;
-
-    /** ID of failed node started transaction. */
-    private final UUID failedNodeId;
-
-    /** Transaction nodes mapping. */
-    private final Map<UUID, Collection<UUID>> txNodes;
-
-    /** */
-    private final boolean nearTxCheck;
-
-    /**
-     * @param cctx Context.
-     * @param tx Transaction.
-     * @param failedNodeId ID of failed node started transaction.
-     * @param txNodes Transaction mapping.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx,
-        IgniteInternalTx tx,
-        UUID failedNodeId,
-        Map<UUID, Collection<UUID>> txNodes)
-    {
-        super(cctx.kernalContext(), CU.boolReducer());
-
-        this.cctx = cctx;
-        this.tx = tx;
-        this.txNodes = txNodes;
-        this.failedNodeId = failedNodeId;
-
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, GridCacheOptimisticCheckPreparedTxFuture.class);
-
-        nodes = new GridLeanMap<>();
-
-        UUID locNodeId = cctx.localNodeId();
-
-        for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
-            if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
-                ClusterNode node = cctx.discovery().node(e.getKey());
-
-                if (node != null)
-                    nodes.put(node.id(), node);
-                else if (log.isDebugEnabled())
-                    log.debug("Transaction node left (will ignore) " + e.getKey());
-            }
-
-            for (UUID nodeId : e.getValue()) {
-                if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
-                    ClusterNode node = cctx.discovery().node(nodeId);
-
-                    if (node != null)
-                        nodes.put(node.id(), node);
-                    else if (log.isDebugEnabled())
-                        log.debug("Transaction node left (will ignore) " + e.getKey());
-                }
-            }
-        }
-
-        UUID nearNodeId = tx.eventNodeId();
-
-        nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
-    }
-
-    /**
-     * Initializes future.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public void prepare() {
-        if (nearTxCheck) {
-            UUID nearNodeId = tx.eventNodeId();
-
-            if (cctx.localNodeId().equals(nearNodeId)) {
-                IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
-
-                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
-                        try {
-                            onDone(fut.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            onDone(e);
-                        }
-                    }
-                });
-            }
-            else {
-                MiniFuture fut = new MiniFuture(tx.eventNodeId());
-
-                add(fut);
-
-                GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
-                    tx,
-                    0,
-                    true,
-                    futureId(),
-                    fut.futureId());
-
-                try {
-                    cctx.io().send(nearNodeId, req, tx.ioPolicy());
-                }
-                catch (ClusterTopologyCheckedException e) {
-                    fut.onNodeLeft();
-                }
-                catch (IgniteCheckedException e) {
-                    fut.onError(e);
-                }
-
-                markInitialized();
-            }
-
-            return;
-        }
-
-        // First check transactions on local node.
-        int locTxNum = nodeTransactions(cctx.localNodeId());
-
-        if (locTxNum > 1) {
-            IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
-
-            if (fut == null || fut.isDone()) {
-                boolean prepared;
-
-                try {
-                    prepared = fut == null ? true : fut.get();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Check prepared transaction future failed: " + e, e);
-
-                    prepared = false;
-                }
-
-                if (!prepared) {
-                    onDone(false);
-
-                    markInitialized();
-
-                    return;
-                }
-            }
-            else {
-                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
-                        boolean prepared;
-
-                        try {
-                            prepared = fut.get();
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Check prepared transaction future failed: " + e, e);
-
-                            prepared = false;
-                        }
-
-                        if (!prepared) {
-                            onDone(false);
-
-                            markInitialized();
-                        }
-                        else
-                            proceedPrepare();
-                    }
-                });
-
-                return;
-            }
-        }
-
-        proceedPrepare();
-    }
-
-    /**
-     * Process prepare after local check.
-     */
-    private void proceedPrepare() {
-        for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
-            UUID nodeId = entry.getKey();
-
-            // Skip left nodes and local node.
-            if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
-                continue;
-
-            /*
-             * If primary node failed then send message to all backups, otherwise
-             * send message only to primary node.
-             */
-
-            if (nodeId.equals(failedNodeId)) {
-                for (UUID id : entry.getValue()) {
-                    // Skip backup node if it is local node or if it is also was mapped as primary.
-                    if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
-                        continue;
-
-                    MiniFuture fut = new MiniFuture(id);
-
-                    add(fut);
-
-                    GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
-                        nodeTransactions(id),
-                        false,
-                        futureId(),
-                        fut.futureId());
-
-                    try {
-                        cctx.io().send(id, req, tx.ioPolicy());
-                    }
-                    catch (ClusterTopologyCheckedException ignored) {
-                        fut.onNodeLeft();
-                    }
-                    catch (IgniteCheckedException e) {
-                        fut.onError(e);
-
-                        break;
-                    }
-                }
-            }
-            else {
-                MiniFuture fut = new MiniFuture(nodeId);
-
-                add(fut);
-
-                GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
-                    tx,
-                    nodeTransactions(nodeId),
-                    false,
-                    futureId(),
-                    fut.futureId());
-
-                try {
-                    cctx.io().send(nodeId, req, tx.ioPolicy());
-                }
-                catch (ClusterTopologyCheckedException ignored) {
-                    fut.onNodeLeft();
-                }
-                catch (IgniteCheckedException e) {
-                    fut.onError(e);
-
-                    break;
-                }
-            }
-        }
-
-        markInitialized();
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @return Number of transactions on node.
-     */
-    private int nodeTransactions(UUID nodeId) {
-        int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
-
-        for (Collection<UUID> backups : txNodes.values()) {
-            for (UUID backup : backups) {
-                if (backup.equals(nodeId)) {
-                    cnt++; // +1 if node is backup.
-
-                    break;
-                }
-            }
-        }
-
-        return cnt;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
-    public void onResult(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
-        if (!isDone()) {
-            for (IgniteInternalFuture<Boolean> fut : pending()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
-
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.nodeId().equals(nodeId);
-
-                        f.onResult(res);
-
-                        break;
-                    }
-                }
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return tx.xidVersion();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return nodes.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteInternalFuture<?> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.nodeId().equals(nodeId)) {
-                    f.onNodeLeft();
-
-                    return true;
-                }
-            }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return trackable;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        trackable = false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
-        if (super.onDone(res, err)) {
-            cctx.mvcc().removeFuture(this);
-
-            if (err == null) {
-                assert res != null;
-
-                cctx.tm().finishOptimisticTxOnRecovery(tx, res);
-            }
-            else {
-                if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to check transaction on near node, " +
-                            "ignoring [err=" + err + ", tx=" + tx + ']');
-                }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to check prepared transactions, " +
-                            "invalidating transaction [err=" + err + ", tx=" + tx + ']');
-
-                    cctx.tm().salvageTx(tx);
-                }
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteInternalFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheOptimisticCheckPreparedTxFuture.class, this, "super", super.toString());
-    }
-
-    /**
-     *
-     */
-    private class MiniFuture extends GridFutureAdapter<Boolean> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Mini future ID. */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-        /** Node ID. */
-        private UUID nodeId;
-
-        /**
-         * @param nodeId Node ID.
-         */
-        private MiniFuture(UUID nodeId) {
-            this.nodeId = nodeId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        private UUID nodeId() {
-            return nodeId;
-        }
-
-        /**
-         * @return Future ID.
-         */
-        private IgniteUuid futureId() {
-            return futId;
-        }
-
-        /**
-         * @param e Error.
-         */
-        private void onError(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-            onDone(e);
-        }
-
-        /**
-         */
-        private void onNodeLeft() {
-            if (log.isDebugEnabled())
-                log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
-
-            if (nearTxCheck) {
-                // Near and originating nodes left, need initiate tx check.
-                cctx.tm().commitIfPrepared(tx);
-
-                onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
-            }
-            else
-                onDone(true);
-        }
-
-        /**
-         * @param res Result callback.
-         */
-        private void onResult(GridCacheOptimisticCheckPreparedTxResponse res) {
-            onDone(res.success());
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
deleted file mode 100644
index 4f2a1d6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Message sent to check that transactions related to transaction were prepared on remote node.
- */
-public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /** Near transaction ID. */
-    private GridCacheVersion nearXidVer;
-
-    /** Expected number of transactions on node. */
-    private int txNum;
-
-    /** System transaction flag. */
-    private boolean sys;
-
-    /** {@code True} if should check only tx on near node. */
-    private boolean nearTxCheck;
-
-    /**
-     * Empty constructor required by {@link Externalizable}
-     */
-    public GridCacheOptimisticCheckPreparedTxRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param tx Transaction.
-     * @param txNum Expected number of transactions on remote node.
-     * @param nearTxCheck
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     */
-    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx,
-        int txNum,
-        boolean nearTxCheck,
-        IgniteUuid futId,
-        IgniteUuid miniId)
-    {
-        super(tx.xidVersion(), 0);
-
-        nearXidVer = tx.nearXidVersion();
-        sys = tx.system();
-
-        this.futId = futId;
-        this.miniId = miniId;
-        this.txNum = txNum;
-        this.nearTxCheck = nearTxCheck;
-    }
-
-    /**
-     * @return {@code True} if should check only tx on near node.
-     */
-    public boolean nearTxCheck() {
-        return nearTxCheck;
-    }
-
-    /**
-     * @return Near version.
-     */
-    public GridCacheVersion nearXidVersion() {
-        return nearXidVer;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /**
-     * @return Expected number of transactions on node.
-     */
-    public int transactions() {
-        return txNum;
-    }
-
-    /**
-     * @return System transaction flag.
-     */
-    public boolean system() {
-        return sys;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 8:
-                if (!writer.writeIgniteUuid("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeIgniteUuid("miniId", miniId))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeBoolean("sys", sys))
-                    return false;
-
-                writer.incrementState();
-
-            case 13:
-                if (!writer.writeInt("txNum", txNum))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 8:
-                futId = reader.readIgniteUuid("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                miniId = reader.readIgniteUuid("miniId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                nearTxCheck = reader.readBoolean("nearTxCheck");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                nearXidVer = reader.readMessage("nearXidVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                sys = reader.readBoolean("sys");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 13:
-                txNum = reader.readInt("txNum");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 16;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 14;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheOptimisticCheckPreparedTxRequest.class, this, "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
deleted file mode 100644
index bc8c2e0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Check prepared transactions response.
- */
-public class GridCacheOptimisticCheckPreparedTxResponse extends GridDistributedBaseMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Mini future ID. */
-    private IgniteUuid miniId;
-
-    /** Flag indicating if all remote transactions were prepared. */
-    private boolean success;
-
-    /**
-     * Empty constructor required by {@link Externalizable}
-     */
-    public GridCacheOptimisticCheckPreparedTxResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param txId Transaction ID.
-     * @param futId Future ID.
-     * @param miniId Mini future ID.
-     * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
-     */
-    public GridCacheOptimisticCheckPreparedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId,
-        boolean success) {
-        super(txId, 0);
-
-        this.futId = futId;
-        this.miniId = miniId;
-        this.success = success;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Mini future ID.
-     */
-    public IgniteUuid miniId() {
-        return miniId;
-    }
-
-    /**
-     * @return {@code True} if all remote transactions were prepared.
-     */
-    public boolean success() {
-        return success;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 8:
-                if (!writer.writeIgniteUuid("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeIgniteUuid("miniId", miniId))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeBoolean("success", success))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 8:
-                futId = reader.readIgniteUuid("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                miniId = reader.readIgniteUuid("miniId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                success = reader.readBoolean("success");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 17;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 11;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheOptimisticCheckPreparedTxResponse.class, this, "super", super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
new file mode 100644
index 0000000..663ed90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Future verifying that all remote transactions related to transaction were prepared or committed.
+ */
+public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+    /** */         
+    private static final long serialVersionUID = 0L;
+    
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    private static IgniteLogger log;
+
+    /** Trackable flag. */
+    private boolean trackable = true;
+
+    /** Context. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /** Future ID. */
+    private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+    /** Transaction. */
+    private final IgniteInternalTx tx;
+
+    /** All involved nodes. */
+    private final Map<UUID, ClusterNode> nodes;
+
+    /** ID of failed node started transaction. */
+    private final UUID failedNodeId;
+
+    /** Transaction nodes mapping. */
+    private final Map<UUID, Collection<UUID>> txNodes;
+
+    /** */
+    private final boolean nearTxCheck;
+
+    /**
+     * @param cctx Context.
+     * @param tx Transaction.
+     * @param failedNodeId ID of failed node started transaction.
+     * @param txNodes Transaction mapping.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
+        IgniteInternalTx tx,
+         UUID failedNodeId,
+        Map<UUID, Collection<UUID>> txNodes)
+    {
+        super(cctx.kernalContext(), CU.boolReducer());
+
+        this.cctx = cctx;
+        this.tx = tx;
+        this.txNodes = txNodes;
+        this.failedNodeId = failedNodeId;
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
+
+        nodes = new GridLeanMap<>();
+
+        UUID locNodeId = cctx.localNodeId();
+
+        for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
+            if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
+                ClusterNode node = cctx.discovery().node(e.getKey());
+
+                if (node != null)
+                    nodes.put(node.id(), node);
+                else if (log.isDebugEnabled())
+                    log.debug("Transaction node left (will ignore) " + e.getKey());
+            }
+
+            for (UUID nodeId : e.getValue()) {
+                if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
+                    ClusterNode node = cctx.discovery().node(nodeId);
+
+                    if (node != null)
+                        nodes.put(node.id(), node);
+                    else if (log.isDebugEnabled())
+                        log.debug("Transaction node left (will ignore) " + e.getKey());
+                }
+            }
+        }
+
+        UUID nearNodeId = tx.eventNodeId();
+
+        nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+    }
+
+    /**
+     * Initializes future.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void prepare() {
+        if (nearTxCheck) {
+            UUID nearNodeId = tx.eventNodeId();
+
+            if (cctx.localNodeId().equals(nearNodeId)) {
+                IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                        try {
+                            onDone(fut.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+            }
+            else {
+                MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+                add(fut);
+
+                GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+                    tx,
+                    0,
+                    true,
+                    futureId(),
+                    fut.futureId());
+
+                try {
+                    cctx.io().send(nearNodeId, req, tx.ioPolicy());
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    fut.onNodeLeft();
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onError(e);
+                }
+
+                markInitialized();
+            }
+
+            return;
+        }
+
+        // First check transactions on local node.
+        int locTxNum = nodeTransactions(cctx.localNodeId());
+
+        if (locTxNum > 1) {
+            IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
+
+            if (fut == null || fut.isDone()) {
+                boolean prepared;
+
+                try {
+                    prepared = fut == null ? true : fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Check prepared transaction future failed: " + e, e);
+
+                    prepared = false;
+                }
+
+                if (!prepared) {
+                    onDone(false);
+
+                    markInitialized();
+
+                    return;
+                }
+            }
+            else {
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                        boolean prepared;
+
+                        try {
+                            prepared = fut.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Check prepared transaction future failed: " + e, e);
+
+                            prepared = false;
+                        }
+
+                        if (!prepared) {
+                            onDone(false);
+
+                            markInitialized();
+                        }
+                        else
+                            proceedPrepare();
+                    }
+                });
+
+                return;
+            }
+        }
+
+        proceedPrepare();
+    }
+
+    /**
+     * Process prepare after local check.
+     */
+    private void proceedPrepare() {
+        for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
+            UUID nodeId = entry.getKey();
+
+            // Skip left nodes and local node.
+            if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
+                continue;
+
+            /*
+             * If primary node failed then send message to all backups, otherwise
+             * send message only to primary node.
+             */
+
+            if (nodeId.equals(failedNodeId)) {
+                for (UUID id : entry.getValue()) {
+                    // Skip backup node if it is local node or if it is also was mapped as primary.
+                    if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
+                        continue;
+
+                    MiniFuture fut = new MiniFuture(id);
+
+                    add(fut);
+
+                    GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(tx,
+                        nodeTransactions(id),
+                        false,
+                        futureId(),
+                        fut.futureId());
+
+                    try {
+                        cctx.io().send(id, req, tx.ioPolicy());
+                    }
+                    catch (ClusterTopologyCheckedException ignored) {
+                        fut.onNodeLeft();
+                    }
+                    catch (IgniteCheckedException e) {
+                        fut.onError(e);
+
+                        break;
+                    }
+                }
+            }
+            else {
+                MiniFuture fut = new MiniFuture(nodeId);
+
+                add(fut);
+
+                GridCacheTxRecoveryRequest req = new GridCacheTxRecoveryRequest(
+                    tx,
+                    nodeTransactions(nodeId),
+                    false,
+                    futureId(),
+                    fut.futureId());
+
+                try {
+                    cctx.io().send(nodeId, req, tx.ioPolicy());
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    fut.onNodeLeft();
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onError(e);
+
+                    break;
+                }
+            }
+        }
+
+        markInitialized();
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return Number of transactions on node.
+     */
+    private int nodeTransactions(UUID nodeId) {
+        int cnt = txNodes.containsKey(nodeId) ? 1 : 0; // +1 if node is primary.
+
+        for (Collection<UUID> backups : txNodes.values()) {
+            for (UUID backup : backups) {
+                if (backup.equals(nodeId)) {
+                    cnt++; // +1 if node is backup.
+
+                    break;
+                }
+            }
+        }
+
+        return cnt;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
+        if (!isDone()) {
+            for (IgniteInternalFuture<Boolean> fut : pending()) {
+                if (isMini(fut)) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId())) {
+                        assert f.nodeId().equals(nodeId);
+
+                        f.onResult(res);
+
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return tx.xidVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        return nodes.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        for (IgniteInternalFuture<?> fut : futures())
+            if (isMini(fut)) {
+                MiniFuture f = (MiniFuture)fut;
+
+                if (f.nodeId().equals(nodeId)) {
+                    f.onNodeLeft();
+
+                    return true;
+                }
+            }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        trackable = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+        if (super.onDone(res, err)) {
+            cctx.mvcc().removeFuture(this);
+
+            if (err == null) {
+                assert res != null;
+
+                cctx.tm().finishTxOnRecovery(tx, res);
+            }
+            else {
+                if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to check transaction on near node, " +
+                            "ignoring [err=" + err + ", tx=" + tx + ']');
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to check prepared transactions, " +
+                            "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+
+                    cctx.tm().salvageTx(tx);
+                }
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param f Future.
+     * @return {@code True} if mini-future.
+     */
+    private boolean isMini(IgniteInternalFuture<?> f) {
+        return f.getClass().equals(MiniFuture.class);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheTxRecoveryFuture.class, this, "super", super.toString());
+    }
+
+    /**
+     *
+     */
+    private class MiniFuture extends GridFutureAdapter<Boolean> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Mini future ID. */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /** Node ID. */
+        private UUID nodeId;
+
+        /**
+         * @param nodeId Node ID.
+         */
+        private MiniFuture(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        private UUID nodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @return Future ID.
+         */
+        private IgniteUuid futureId() {
+            return futId;
+        }
+
+        /**
+         * @param e Error.
+         */
+        private void onError(Throwable e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+            onDone(e);
+        }
+
+        /**
+         */
+        private void onNodeLeft() {
+            if (log.isDebugEnabled())
+                log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
+
+            if (nearTxCheck) {
+                // Near and originating nodes left, need initiate tx check.
+                cctx.tm().commitIfPrepared(tx);
+
+                onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+            }
+            else
+                onDone(true);
+        }
+
+        /**
+         * @param res Result callback.
+         */
+        private void onResult(GridCacheTxRecoveryResponse res) {
+            onDone(res.success());
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MiniFuture.class, this, "done", isDone(), "err", error());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
new file mode 100644
index 0000000..259c288
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Message sent to check that transactions related to transaction were prepared on remote node.
+ */
+public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Mini future ID. */
+    private IgniteUuid miniId;
+
+    /** Near transaction ID. */
+    private GridCacheVersion nearXidVer;
+
+    /** Expected number of transactions on node. */
+    private int txNum;
+
+    /** System transaction flag. */
+    private boolean sys;
+
+    /** {@code True} if should check only tx on near node. */
+    private boolean nearTxCheck;
+
+    /**
+     * Empty constructor required by {@link Externalizable}
+     */
+    public GridCacheTxRecoveryRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param txNum Expected number of transactions on remote node.
+     * @param nearTxCheck {@code True} if should check only tx on near node.
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     */
+    public GridCacheTxRecoveryRequest(IgniteInternalTx tx,
+        int txNum,
+        boolean nearTxCheck,
+        IgniteUuid futId,
+        IgniteUuid miniId)
+    {
+        super(tx.xidVersion(), 0);
+
+        nearXidVer = tx.nearXidVersion();
+        sys = tx.system();
+
+        this.futId = futId;
+        this.miniId = miniId;
+        this.txNum = txNum;
+        this.nearTxCheck = nearTxCheck;
+    }
+
+    /**
+     * @return {@code True} if should check only tx on near node.
+     */
+    public boolean nearTxCheck() {
+        return nearTxCheck;
+    }
+
+    /**
+     * @return Near version.
+     */
+    public GridCacheVersion nearXidVersion() {
+        return nearXidVer;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Mini future ID.
+     */
+    public IgniteUuid miniId() {
+        return miniId;
+    }
+
+    /**
+     * @return Expected number of transactions on node.
+     */
+    public int transactions() {
+        return txNum;
+    }
+
+    /**
+     * @return System transaction flag.
+     */
+    public boolean system() {
+        return sys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 8:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeIgniteUuid("miniId", miniId))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeBoolean("sys", sys))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeInt("txNum", txNum))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 8:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                miniId = reader.readIgniteUuid("miniId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                nearTxCheck = reader.readBoolean("nearTxCheck");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                nearXidVer = reader.readMessage("nearXidVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                sys = reader.readBoolean("sys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                txNum = reader.readInt("txNum");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 16;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 14;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheTxRecoveryRequest.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
new file mode 100644
index 0000000..e5c026a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Transactions recovery check response.
+ */
+public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Mini future ID. */
+    private IgniteUuid miniId;
+
+    /** Flag indicating if all remote transactions were prepared. */
+    private boolean success;
+
+    /**
+     * Empty constructor required by {@link Externalizable}
+     */
+    public GridCacheTxRecoveryResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param txId Transaction ID.
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
+     */
+    public GridCacheTxRecoveryResponse(GridCacheVersion txId,
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        boolean success)
+    {
+        super(txId, 0);
+
+        this.futId = futId;
+        this.miniId = miniId;
+        this.success = success;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Mini future ID.
+     */
+    public IgniteUuid miniId() {
+        return miniId;
+    }
+
+    /**
+     * @return {@code True} if all remote transactions were prepared.
+     */
+    public boolean success() {
+        return success;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 8:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeIgniteUuid("miniId", miniId))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeBoolean("success", success))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 8:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                miniId = reader.readIgniteUuid("miniId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                success = reader.readBoolean("success");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 17;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 11;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheTxRecoveryResponse.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 2897e30..af75fb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -119,16 +119,16 @@ public class IgniteTxHandler {
             }
         });
 
-        ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
-            new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest>() {
-                @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) {
+        ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class,
+            new CI2<UUID, GridCacheTxRecoveryRequest>() {
+                @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) {
                     processCheckPreparedTxRequest(nodeId, req);
                 }
             });
 
-        ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
-            new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse>() {
-                @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+        ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class,
+            new CI2<UUID, GridCacheTxRecoveryResponse>() {
+                @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) {
                     processCheckPreparedTxResponse(nodeId, res);
                 }
             });
@@ -138,6 +138,7 @@ public class IgniteTxHandler {
      * @param nearNodeId Near node ID that initiated transaction.
      * @param locTx Optional local transaction.
      * @param req Near prepare request.
+     * @param completeCb Completion callback.
      * @return Future for transaction.
      */
     public IgniteInternalFuture<IgniteInternalTx> prepareTx(
@@ -170,6 +171,7 @@ public class IgniteTxHandler {
      *
      * @param locTx Local transaction.
      * @param req Near prepare request.
+     * @param completeCb Completion callback.
      * @return Prepare future.
      */
     private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx(
@@ -177,7 +179,6 @@ public class IgniteTxHandler {
         final GridNearTxPrepareRequest req,
         final IgniteInClosure<GridNearTxPrepareResponse> completeCb
     ) {
-
         IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
 
         return new GridEmbeddedFuture<>(
@@ -223,6 +224,7 @@ public class IgniteTxHandler {
      *
      * @param nearNodeId Near node ID that initiated transaction.
      * @param req Near prepare request.
+     * @param completeCb Completion callback.
      * @return Prepare future.
      */
     private IgniteInternalFuture<IgniteInternalTx> prepareNearTx(
@@ -442,6 +444,7 @@ public class IgniteTxHandler {
 
     /**
      * @param nodeId Node ID.
+     * @param locTx Local transaction.
      * @param req Request.
      * @return Future.
      */
@@ -1099,6 +1102,7 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param cacheCtx Context.
      * @param key Key
      * @param ver Version.
      * @throws IgniteCheckedException If invalidate failed.
@@ -1183,7 +1187,7 @@ public class IgniteTxHandler {
      * @param req Request.
      */
     protected void processCheckPreparedTxRequest(final UUID nodeId,
-        final GridCacheOptimisticCheckPreparedTxRequest req)
+        final GridCacheTxRecoveryRequest req)
     {
         if (log.isDebugEnabled())
             log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
@@ -1231,10 +1235,10 @@ public class IgniteTxHandler {
      * @param prepared {@code True} if all transaction prepared or committed.
      */
     private void sendCheckPreparedResponse(UUID nodeId,
-        GridCacheOptimisticCheckPreparedTxRequest req,
+        GridCacheTxRecoveryRequest req,
         boolean prepared) {
-        GridCacheOptimisticCheckPreparedTxResponse res =
-            new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared);
+        GridCacheTxRecoveryResponse res =
+            new GridCacheTxRecoveryResponse(req.version(), req.futureId(), req.miniId(), prepared);
 
         try {
             if (log.isDebugEnabled())
@@ -1256,11 +1260,11 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param res Response.
      */
-    protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse res) {
+    protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) {
         if (log.isDebugEnabled())
             log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
 
-        GridCacheOptimisticCheckPreparedTxFuture fut = (GridCacheOptimisticCheckPreparedTxFuture)ctx.mvcc().
+        GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().
             <Boolean>future(res.version(), res.futureId());
 
         if (fut == null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 85b3ad0..8a1d490 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1931,40 +1931,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Gets local transaction for pessimistic tx recovery.
-     *
-     * @param nearXidVer Near tx ID.
-     * @return Near local or colocated local transaction.
-     */
-    @Nullable public IgniteInternalTx localTxForRecovery(GridCacheVersion nearXidVer, boolean markFinalizing) {
-        // First check if we have near transaction with this ID.
-        IgniteInternalTx tx = idMap.get(nearXidVer);
-
-        if (tx == null) {
-            // Check all local transactions and mark them as waiting for recovery to prevent finish race.
-            for (IgniteInternalTx txEx : idMap.values()) {
-                if (nearXidVer.equals(txEx.nearXidVersion())) {
-                    if (!markFinalizing || !txEx.markFinalizing(RECOVERY_WAIT))
-                        tx = txEx;
-                }
-            }
-        }
-
-        // Either we found near transaction or one of transactions is being committed by user.
-        // Wait for it and send reply.
-        if (tx != null && tx.local())
-            return tx;
-
-        return null;
-    }
-
-    /**
      * Commits or rolls back prepared transaction.
      *
      * @param tx Transaction.
      * @param commit Whether transaction should be committed or rolled back.
      */
-    public void finishOptimisticTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
+    public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
         if (log.isDebugEnabled())
             log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']');
 
@@ -1989,71 +1961,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Commits or rolls back pessimistic transaction.
-     *
-     * @param tx Transaction to finish.
-     * @param commitInfo Commit information.
-     */
-    public void finishPessimisticTxOnRecovery(final IgniteInternalTx tx, GridCacheCommittedTxInfo commitInfo) {
-        if (!tx.markFinalizing(RECOVERY_FINISH)) {
-            if (log.isDebugEnabled())
-                log.debug("Will not try to finish pessimistic transaction (could not mark as finalizing): " + tx);
-
-            return;
-        }
-
-        if (tx instanceof GridDistributedTxRemoteAdapter) {
-            IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
-            rmtTx.doneRemote(tx.xidVersion(),
-                Collections.<GridCacheVersion>emptyList(),
-                Collections.<GridCacheVersion>emptyList(),
-                Collections.<GridCacheVersion>emptyList());
-        }
-
-        try {
-            tx.prepare();
-
-            if (commitInfo != null) {
-                for (IgniteTxEntry entry : commitInfo.recoveryWrites()) {
-                    IgniteTxEntry write = tx.writeMap().get(entry.txKey());
-
-                    if (write != null) {
-                        GridCacheEntryEx cached = write.cached();
-
-                        IgniteTxEntry recovered = entry.cleanCopy(write.context());
-
-                        if (cached == null || cached.detached())
-                            cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion());
-
-                        recovered.cached(cached);
-
-                        tx.writeMap().put(entry.txKey(), recovered);
-
-                        continue;
-                    }
-
-                    // If write was not found, check read.
-                    IgniteTxEntry read = tx.readMap().remove(entry.txKey());
-
-                    if (read != null)
-                        tx.writeMap().put(entry.txKey(), entry);
-                }
-
-                tx.commitAsync().listen(new CommitListener(tx));
-            }
-            else
-                tx.rollbackAsync();
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to prepare pessimistic transaction (will invalidate): " + tx, e);
-
-            salvageTx(tx);
-        }
-    }
-
-    /**
-     * Commits optimistic transaction in case when node started transaction failed, but all related
+     * Commits transaction in case when node started transaction failed, but all related
      * transactions were prepared (invalidates transaction if it is not fully prepared).
      *
      * @param tx Transaction.
@@ -2063,7 +1971,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         assert !F.isEmpty(tx.transactionNodes()) : tx;
         assert tx.nearXidVersion() != null : tx;
 
-        GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
+        GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
             cctx,
             tx,
             tx.originatingNodeId(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f95fb8/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 35495ed..657f4af 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -455,9 +455,9 @@ org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresMa
 org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate
 org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$RemoveSetDataCallable
 org.apache.ignite.internal.processors.cache.distributed.GridCacheCommittedTxInfo
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxFuture$1
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxRequest
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxResponse
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture$1
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse
 org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest
 org.apache.ignite.internal.processors.cache.distributed.GridDistributedBaseMessage
 org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter


[19/19] incubator-ignite git commit: Merge branches 'ignite-855' and 'ignite-sprint-4' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-855

Posted by yz...@apache.org.
Merge branches 'ignite-855' and 'ignite-sprint-4' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-855


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0ccde8d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0ccde8d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0ccde8d4

Branch: refs/heads/ignite-855
Commit: 0ccde8d4e82616cbdd3ded697b6c2c5f88fb0e77
Parents: 733f30e a452dac
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed May 6 14:31:15 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed May 6 14:31:15 2015 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   4 +-
 .../communication/GridIoMessageFactory.java     |   4 +-
 .../cache/DynamicCacheDescriptor.java           |  16 +-
 .../processors/cache/GridCacheAdapter.java      | 511 +++++++++---------
 .../processors/cache/GridCacheProcessor.java    | 189 ++++---
 ...ridCacheOptimisticCheckPreparedTxFuture.java | 434 ---------------
 ...idCacheOptimisticCheckPreparedTxRequest.java | 232 --------
 ...dCacheOptimisticCheckPreparedTxResponse.java | 179 -------
 .../distributed/GridCacheTxRecoveryFuture.java  | 506 ++++++++++++++++++
 .../distributed/GridCacheTxRecoveryRequest.java | 261 +++++++++
 .../GridCacheTxRecoveryResponse.java            | 182 +++++++
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../cache/transactions/IgniteInternalTx.java    |   5 +-
 .../cache/transactions/IgniteTxAdapter.java     |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |  38 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 .../cache/transactions/IgniteTxManager.java     | 173 ++----
 .../processors/igfs/IgfsDataManager.java        |   3 +
 .../processors/igfs/IgfsMetaManager.java        |   2 +-
 .../resources/META-INF/classnames.properties    |  12 +-
 .../processors/cache/CacheGetFromJobTest.java   | 110 ++++
 .../GridCacheAbstractFailoverSelfTest.java      |   4 +-
 .../GridCacheAbstractNodeRestartSelfTest.java   |  94 ++--
 ...xOriginatingNodeFailureAbstractSelfTest.java |   2 +-
 ...rDisabledPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...rtitionedPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...woBackupsPrimaryNodeFailureRecoveryTest.java |  37 ++
 ...ePrimaryNodeFailureRecoveryAbstractTest.java | 533 +++++++++++++++++++
 .../GridCachePartitionedNodeRestartTest.java    |   4 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |   4 +-
 .../GridCacheReplicatedNodeRestartSelfTest.java |   2 +
 .../igfs/IgfsClientCacheSelfTest.java           | 132 +++++
 .../processors/igfs/IgfsStreamsSelfTest.java    |   2 +-
 .../testsuites/IgniteCacheRestartTestSuite.java |   5 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 -
 .../IgniteCacheTxRecoverySelfTestSuite.java     |   4 +
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |   2 +
 37 files changed, 2364 insertions(+), 1393 deletions(-)
----------------------------------------------------------------------



[16/19] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-sprint-4' into ignite-sprint-4

Posted by yz...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-4' into ignite-sprint-4


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7be25bd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7be25bd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7be25bd7

Branch: refs/heads/ignite-855
Commit: 7be25bd7859f6cf45e6454c44ed9b1501d87fd4b
Parents: ba210bb 587103f
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 12:44:56 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 12:44:56 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 109 ++++++++++--------
 .../processors/cache/CacheGetFromJobTest.java   | 110 +++++++++++++++++++
 .../testsuites/IgniteCacheRestartTestSuite.java |   1 +
 3 files changed, 174 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7be25bd7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------


[13/19] incubator-ignite git commit: #ignite-834: IgniteCache.clearAll() throws NPE. #ignite-732: IgniteCache.size() should not fail in case of topology changes.

Posted by yz...@apache.org.
#ignite-834: IgniteCache.clearAll() throws NPE.
#ignite-732: IgniteCache.size() should not fail in case of topology changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/99c7e228
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/99c7e228
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/99c7e228

Branch: refs/heads/ignite-855
Commit: 99c7e228d12e25826f74d6d8706d158ec36004ed
Parents: 9ff8029
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed May 6 12:30:57 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed May 6 12:30:57 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 511 +++++++++----------
 .../resources/META-INF/classnames.properties    |   6 +-
 2 files changed, 248 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..6674993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -21,10 +21,10 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.compute.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -1083,7 +1083,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocally();
 
-        clearRemotes(0, new GlobalClearAllCallable(name()));
+        clearRemotes(0, null);
     }
 
     /** {@inheritDoc} */
@@ -1091,7 +1091,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocally(key);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+        clearRemotes(0, Collections.singleton(key));
     }
 
     /** {@inheritDoc} */
@@ -1099,83 +1099,55 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocallyAll(keys);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
+        clearRemotes(0, keys);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(K key) {
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+        return clearKeysAsync(Collections.singleton(key));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) {
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
+        return clearKeysAsync(keys);
     }
 
     /**
      * @param timeout Timeout for clearLocally all task in milliseconds (0 for never).
      *      Set it to larger value for large caches.
-     * @param clearCall Global clear callable object.
+     * @param keys Keys to clear or {@code null} if all cache should be cleared.
      * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes.
      */
-    private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes =
-                ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
-
-            IgniteInternalFuture<Object> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
+    private void clearRemotes(long timeout, @Nullable final Set<? extends K> keys) throws IgniteCheckedException {
+        // Send job to remote nodes only.
+        Collection<ClusterNode> nodes =
+            ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
 
-                fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
-            }
+        if (!nodes.isEmpty()) {
+            ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
 
-            if (fut != null)
-                fut.get();
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            throw e;
+            ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get();
         }
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync() {
-        return clearAsync(new GlobalClearAllCallable(name()));
+        return clearKeysAsync(null);
     }
 
     /**
-     * @param clearCall Global clear callable object.
+     * @param keys Keys to clear or {@code null} if all cache should be cleared.
      * @return Future.
      */
-    private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
+    private IgniteInternalFuture<?> clearKeysAsync(final Set<? extends K> keys) {
         Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes();
 
         if (!nodes.isEmpty()) {
-            IgniteInternalFuture<Object> fut =
-                ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
+            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() {
-                @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException {
-                    try {
-                        return fut.get();
-                    }
-                    catch (ClusterGroupEmptyCheckedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
-
-                        return null;
-                    }
-                }
-            });
+            return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null);
         }
         else
             return new GridFinishedFuture<>();
@@ -3562,7 +3534,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) {
+    @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) {
         assert peekModes != null;
 
         PeekModes modes = parsePeekModes(peekModes, true);
@@ -3576,22 +3548,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        IgniteInternalFuture<Collection<Integer>> fut =
-            ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes);
-
-        return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() {
-            @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut)
-            throws IgniteCheckedException {
-                Collection<Integer> res = fut.get();
+        ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-                int totalSize = 0;
-
-                for (Integer size : res)
-                    totalSize += size;
-
-                return totalSize;
-            }
-        });
+        return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null);
     }
 
     /** {@inheritDoc} */
@@ -3909,50 +3868,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
-     * Gets cache global size (with or without backups).
-     *
-     * @param primaryOnly {@code True} if only primary sizes should be included.
-     * @return Global size.
-     * @throws IgniteCheckedException If internal task execution failed.
-     */
-    private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
-            IgniteInternalFuture<Collection<Integer>> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
-
-                fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes);
-            }
-
-            // Get local value.
-            int globalSize = primaryOnly ? primarySize() : size();
-
-            if (fut != null) {
-                for (Integer i : fut.get())
-                    globalSize += i;
-            }
-
-            return globalSize;
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]");
-
-            return primaryOnly ? primarySize() : size();
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
-
-            throw e;
-        }
-    }
-
-    /**
      * @param op Cache operation.
      * @param <T> Return type.
      * @return Operation result.
@@ -4893,67 +4808,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
-     * Internal callable which performs clear operation on a cache with the given name.
-     */
-    @GridInternal
-    private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable {
-        /** Cache name. */
-        protected String cacheName;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        protected Ignite ignite;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalClearCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         */
-        protected GlobalClearCallable(String cacheName) {
-            this.cacheName = cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            cacheName = U.readString(in);
-        }
-    }
-
-    /**
      * Global clear all.
      */
     @GridInternal
-    private static class GlobalClearAllCallable extends GlobalClearCallable {
+    private static class GlobalClearAllJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
         /**
          * Empty constructor for serialization.
          */
-        public GlobalClearAllCallable() {
+        public GlobalClearAllJob() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          */
-        private GlobalClearAllCallable(String cacheName) {
-            super(cacheName);
+        private GlobalClearAllJob(String cacheName, AffinityTopologyVersion topVer) {
+            super(cacheName, topVer);
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            ((IgniteEx)ignite).cachex(cacheName).clearLocally();
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+            if (cache != null)
+                cache.clearLocally();
 
             return null;
         }
@@ -4963,7 +4843,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * Global clear keys.
      */
     @GridInternal
-    private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable {
+    private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4973,166 +4853,75 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         /**
          * Empty constructor for serialization.
          */
-        public GlobalClearKeySetCallable() {
+        public GlobalClearKeySetJob() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) {
-            super(cacheName);
+        private GlobalClearKeySetJob(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
+            super(cacheName, topVer);
 
             this.keys = keys;
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+            if (cache != null)
+                cache.clearLocallyAll(keys);
 
             return null;
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            super.writeExternal(out);
-
-            out.writeObject(keys);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            super.readExternal(in);
-
-            keys = (Set<K>) in.readObject();
-        }
     }
 
     /**
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable {
+    private static class SizeJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache name. */
-        private String cacheName;
-
         /** Peek modes. */
         private CachePeekMode[] peekModes;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
         /**
          * Required by {@link Externalizable}.
          */
-        public SizeCallable() {
+        public SizeJob() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
-            this.cacheName = cacheName;
-            this.peekModes = peekModes;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer applyx(Object o) throws IgniteCheckedException {
-            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
-            assert cache != null : cacheName;
-
-            return cache.localSize(peekModes);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-
-            out.writeInt(peekModes.length);
+        private SizeJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
+            super(cacheName, topVer);
 
-            for (int i = 0; i < peekModes.length; i++)
-                U.writeEnum(out, peekModes[i]);
+            this.peekModes = peekModes;
         }
 
         /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            cacheName = U.readString(in);
-
-            int len = in.readInt();
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+            if (cache == null)
+                return 0;
 
-            peekModes = new CachePeekMode[len];
-
-            for (int i = 0; i < len; i++)
-                peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
+            try {
+                return cache.localSize(peekModes);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
         }
 
         /** {@inheritDoc} */
         public String toString() {
-            return S.toString(SizeCallable.class, this);
-        }
-    }
-
-    /**
-     * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
-     * operation on a cache with the given name.
-     */
-    @GridInternal
-    private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Cache name. */
-        private String cacheName;
-
-        /** Primary only flag. */
-        private boolean primaryOnly;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalSizeCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param primaryOnly Primary only flag.
-         */
-        private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
-            this.cacheName = cacheName;
-            this.primaryOnly = primaryOnly;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
-            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
-            return primaryOnly ? cache.primarySize() : cache.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-            out.writeBoolean(primaryOnly);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            cacheName = U.readString(in);
-            primaryOnly = in.readBoolean();
+            return S.toString(SizeJob.class, this);
         }
     }
 
@@ -5697,4 +5486,194 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
         }
     }
+
+    /**
+     * Delayed callable class.
+     */
+    protected static abstract class TopologyVersionAwareJob extends ComputeJobAdapter {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Injected job context. */
+        @JobContextResource
+        protected ComputeJobContext jobCtx;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Affinity topology version. */
+        protected AffinityTopologyVersion topVer;
+
+        /** Cache name. */
+        protected String cacheName;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public TopologyVersionAwareJob() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         */
+        public TopologyVersionAwareJob(String cacheName, AffinityTopologyVersion topVer) {
+            assert topVer != null;
+
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public final Object execute() {
+            waitAffinityReadyFuture();
+
+            IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName);
+
+            return localExecute(cache);
+        }
+
+        /**
+         * @param cache Cache.
+         * @return Local execution result.
+         */
+        @Nullable protected abstract Object localExecute(@Nullable IgniteInternalCache cache);
+
+        /**
+         * Holds (suspends) job execution until our cache version becomes equal to remote cache's version.
+         */
+        private void waitAffinityReadyFuture() {
+            GridCacheProcessor cacheProc = ((IgniteKernal)ignite).context().cache();
+
+            AffinityTopologyVersion locTopVer = cacheProc.context().exchange().readyAffinityVersion();
+
+            if (locTopVer.compareTo(topVer) < 0) {
+                IgniteInternalFuture<?> fut = cacheProc.context().exchange().affinityReadyFuture(topVer);
+
+                if (fut != null && !fut.isDone()) {
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) {
+                            jobCtx.callcc();
+                        }
+                    });
+
+                    jobCtx.holdcc();
+                }
+            }
+        }
+    }
+
+    /**
+     * Size task.
+     */
+    private static class SizeTask extends ComputeTaskAdapter<Object, Integer> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache context. */
+        private GridCacheContext ctx;
+
+        /** Peek modes. */
+        private CachePeekMode[] peekModes;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public SizeTask() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Cache context.
+         */
+        public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) {
+            this.ctx = ctx;
+            this.peekModes = peekModes;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable Object arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+            for (ClusterNode node : subgrid)
+                jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node);
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
+            int size = 0;
+
+            for (ComputeJobResult res : results) {
+                if (res.getException() == null && res != null)
+                    size += res.<Integer>getData();
+            }
+
+            return size;
+        }
+    }
+
+    /**
+     * Clear task.
+     */
+    private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache context. */
+        private GridCacheContext ctx;
+
+        /** Keys to clear. */
+        private Set<? extends K> keys;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public ClearTask() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Cache context.
+         * @param keys Keys to clear.
+         */
+        public ClearTask(GridCacheContext ctx, Set<? extends K> keys) {
+            this.ctx = ctx;
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable Object arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+            for (ClusterNode node : subgrid) {
+                jobs.put(keys == null ?
+                        new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) :
+                        new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys),
+                    node);
+            }
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 35495ed..ff263cd 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -323,13 +323,13 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$72
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$73
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$74
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$9
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure


[18/19] incubator-ignite git commit: #ignite-841: NullPointerException at IgfsMetaManager.onKernalStart0 (IgfsMetaManager.java:115).

Posted by yz...@apache.org.
#ignite-841: NullPointerException at IgfsMetaManager.onKernalStart0 (IgfsMetaManager.java:115).


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a452dac2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a452dac2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a452dac2

Branch: refs/heads/ignite-855
Commit: a452dac26c47ac10586f1f49bc26f03b6b5e6fd4
Parents: d4908f2
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed May 6 13:44:30 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed May 6 13:44:30 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  72 +++++++---
 .../processors/igfs/IgfsDataManager.java        |   3 +
 .../processors/igfs/IgfsMetaManager.java        |   2 +-
 .../igfs/IgfsClientCacheSelfTest.java           | 132 +++++++++++++++++++
 .../processors/igfs/IgfsStreamsSelfTest.java    |   2 +-
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |   2 +
 6 files changed, 192 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a452dac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2b9a821..0e1a9c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2452,6 +2452,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param name Cache name.
+     * @return Cache instance for given name.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> IgniteInternalCache<K, V> getOrStartCache(@Nullable String name) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Getting cache for name: " + name);
+
+        IgniteCache<K, V> jcache = (IgniteCache<K, V>)jCacheProxies.get(maskNull(name));
+
+        if (jcache == null)
+            jcache = startJCache(name, true);
+
+        return jcache == null ? null : ((IgniteCacheProxy<K, V>)jcache).internalProxy();
+    }
+
+    /**
      * @return All configured cache instances.
      */
     public Collection<IgniteInternalCache<?, ?>> caches() {
@@ -2558,37 +2576,53 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (desc != null && !desc.cacheType().userCache())
             throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
 
-        if (cache == null) {
-            if (desc == null || desc.cancelled()) {
-                if (failIfNotStarted)
-                    throw new IllegalArgumentException("Cache is not started: " + cacheName);
+        if (cache == null)
+           cache = startJCache(cacheName, failIfNotStarted);
 
-                return null;
-            }
+        return cache;
+    }
 
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+    /**
+     * @param cacheName Cache name.
+     * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
+     *        otherwise returns {@code null} in this case.
+     * @return Cache instance for given name.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
+        String masked = maskNull(cacheName);
 
-            req.cacheName(cacheName);
+        DynamicCacheDescriptor desc = registeredCaches.get(masked);
 
-            req.deploymentId(desc.deploymentId());
+        if (desc == null || desc.cancelled()) {
+            if (failIfNotStarted)
+                throw new IllegalArgumentException("Cache is not started: " + cacheName);
 
-            CacheConfiguration cfg = new CacheConfiguration(desc.cacheConfiguration());
+            return null;
+        }
 
-            cfg.setNearConfiguration(null);
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
 
-            req.startCacheConfiguration(cfg);
+        req.cacheName(cacheName);
 
-            req.cacheType(desc.cacheType());
+        req.deploymentId(desc.deploymentId());
 
-            req.clientStartOnly(true);
+        CacheConfiguration cfg = new CacheConfiguration(desc.cacheConfiguration());
 
-            F.first(initiateCacheChanges(F.asList(req))).get();
+        cfg.setNearConfiguration(null);
 
-            cache = (IgniteCache<K, V>)jCacheProxies.get(masked);
+        req.startCacheConfiguration(cfg);
 
-            if (cache == null && failIfNotStarted)
-                throw new IllegalArgumentException("Cache is not started: " + cacheName);
-        }
+        req.cacheType(desc.cacheType());
+
+        req.clientStartOnly(true);
+
+        F.first(initiateCacheChanges(F.asList(req))).get();
+
+        IgniteCache cache = jCacheProxies.get(masked);
+
+        if (cache == null && failIfNotStarted)
+            throw new IllegalArgumentException("Cache is not started: " + cacheName);
 
         return cache;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a452dac2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 319b696..aa6427d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -202,7 +202,10 @@ public class IgfsDataManager extends IgfsManager {
 
     /** {@inheritDoc} */
     @Override protected void onKernalStart0() throws IgniteCheckedException {
+        igfsCtx.kernalContext().cache().getOrStartCache(igfsCtx.configuration().getDataCacheName());
         dataCachePrj = igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName());
+
+        igfsCtx.kernalContext().cache().getOrStartCache(igfsCtx.configuration().getDataCacheName());
         dataCache = igfsCtx.kernalContext().cache().internalCache(igfsCtx.configuration().getDataCacheName());
 
         metrics = igfsCtx.igfs().localMetrics();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a452dac2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 35ca8bb..e33e0d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -108,7 +108,7 @@ public class IgfsMetaManager extends IgfsManager {
 
     /** {@inheritDoc} */
     @Override protected void onKernalStart0() throws IgniteCheckedException {
-        metaCache = igfsCtx.kernalContext().cache().cache(cfg.getMetaCacheName());
+        metaCache = igfsCtx.kernalContext().cache().getOrStartCache(cfg.getMetaCacheName());
 
         assert metaCache != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a452dac2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
new file mode 100644
index 0000000..d983302
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsClientCacheSelfTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Test for igfs with nodes in client mode (see {@link IgniteConfiguration#setClientMode(boolean)}.
+ */
+public class IgfsClientCacheSelfTest extends IgfsAbstractSelfTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Meta-information cache name. */
+    private static final String META_CACHE_NAME = "meta";
+
+    /** Data cache name. */
+    private static final String DATA_CACHE_NAME = null;
+
+    /** Regular cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /**
+     * Constructor.
+     */
+    public IgfsClientCacheSelfTest() {
+        super(IgfsMode.PRIMARY);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        igfsSecondaryFileSystem = createSecondaryFileSystemStack();
+
+        Ignite ignite1 = G.start(getConfiguration(getTestGridName(1)));
+
+        igfs = (IgfsImpl) ignite1.fileSystem("igfs");
+    }
+
+    /**{@inheritDoc} */
+    protected IgfsSecondaryFileSystem createSecondaryFileSystemStack() throws Exception {
+        Ignite igniteSecondary = G.start(getConfiguration(getTestGridName(0)));
+
+        IgfsEx secondaryIgfsImpl = (IgfsEx)igniteSecondary.fileSystem("igfs");
+
+        igfsSecondary = new IgfsExUniversalFileSystemAdapter(secondaryIgfsImpl);
+
+        return secondaryIgfsImpl.asSecondary();
+    }
+
+    /**
+     *
+     * @param gridName Grid name.
+     * @return Ignite configuration.
+     * @throws Exception If failed.
+     */
+    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME),
+            cacheConfiguration(CACHE_NAME));
+
+        if (!gridName.equals(getTestGridName(0)))
+            cfg.setClientMode(true);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setMetaCacheName(META_CACHE_NAME);
+        igfsCfg.setDataCacheName(DATA_CACHE_NAME);
+        igfsCfg.setName("igfs");
+
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        return cfg;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName(cacheName);
+
+        if (META_CACHE_NAME.equals(cacheName))
+            cacheCfg.setCacheMode(REPLICATED);
+        else {
+            cacheCfg.setCacheMode(PARTITIONED);
+            cacheCfg.setNearConfiguration(null);
+
+            cacheCfg.setBackups(0);
+            cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
+        }
+
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        return cacheCfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a452dac2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
index 57b1010..d377560 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
@@ -154,7 +154,7 @@ public class IgfsStreamsSelfTest extends IgfsCommonAbstractTest {
      */
     public void testConfiguration() throws IgniteCheckedException {
         IgniteInternalCache metaCache = getFieldValue(fs, "meta", "metaCache");
-        GridCacheAdapter dataCache = getFieldValue(fs, "data", "dataCache");
+        IgniteInternalCache dataCache = getFieldValue(fs, "data", "dataCache");
 
         assertNotNull(metaCache);
         assertEquals(META_CACHE_NAME, metaCache.name());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a452dac2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
index 11e22b1..4f3178e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
@@ -59,6 +59,8 @@ public class IgniteIgfsTestSuite extends TestSuite {
         suite.addTest(new TestSuite(IgfsDualSyncSelfTest.class));
         suite.addTest(new TestSuite(IgfsDualAsyncSelfTest.class));
 
+        suite.addTest(new TestSuite(IgfsClientCacheSelfTest.class));
+
         suite.addTest(new TestSuite(IgfsModeResolverSelfTest.class));
 
         suite.addTestSuite(IgfsFragmentizerSelfTest.class);


[17/19] incubator-ignite git commit: ignite-862: Fixed.

Posted by yz...@apache.org.
ignite-862: Fixed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d4908f24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d4908f24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d4908f24

Branch: refs/heads/ignite-855
Commit: d4908f2449a4fde9298f6ca11590e0a94a94c955
Parents: 7be25bd
Author: Artem Shutak <as...@gridgain.com>
Authored: Wed May 6 12:58:08 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Wed May 6 12:58:08 2015 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheDescriptor.java           | 16 ++++++++++-
 .../processors/cache/GridCacheProcessor.java    | 30 ++++++--------------
 2 files changed, 24 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d4908f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index d8da9ef..6f6f422 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -56,13 +58,17 @@ public class DynamicCacheDescriptor {
     /** Template configuration flag. */
     private boolean template;
 
+    /** Cache plugin manager. */
+    private final CachePluginManager pluginMgr;
+
     /**
      * @param cacheCfg Cache configuration.
      * @param cacheType Cache type.
      * @param template {@code True} if this is template configuration.
      * @param deploymentId Deployment ID.
      */
-    public DynamicCacheDescriptor(CacheConfiguration cacheCfg,
+    public DynamicCacheDescriptor(GridKernalContext ctx,
+        CacheConfiguration cacheCfg,
         CacheType cacheType,
         boolean template,
         IgniteUuid deploymentId) {
@@ -70,6 +76,7 @@ public class DynamicCacheDescriptor {
         this.cacheType = cacheType;
         this.template = template;
         this.deploymentId = deploymentId;
+        pluginMgr = new CachePluginManager(ctx, cacheCfg);
     }
 
     /**
@@ -149,6 +156,13 @@ public class DynamicCacheDescriptor {
     }
 
     /**
+     * @return Cache plugin manager.
+     */
+    public CachePluginManager pluginManager() {
+        return pluginMgr;
+    }
+
+    /**
      * Sets cancelled flag.
      */
     public void onCancelled() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d4908f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d22d224..2b9a821 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -610,7 +610,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
 
-            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cfg, cacheType, template, IgniteUuid.randomUuid());
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template,
+                IgniteUuid.randomUuid());
 
             desc.locallyConfigured(true);
             desc.staticallyConfigured(true);
@@ -638,7 +639,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (cfg.getName() == null) { // Use cache configuration with null name as template.
                 DynamicCacheDescriptor desc0 =
-                    new DynamicCacheDescriptor(cfg, cacheType, true, IgniteUuid.randomUuid());
+                    new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
 
                 desc0.locallyConfigured(true);
                 desc0.staticallyConfigured(true);
@@ -666,17 +667,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ClusterNode locNode = ctx.discovery().localNode();
 
-            // Init cache plugin managers.
-            final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
-
-            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                CacheConfiguration locCcfg = desc.cacheConfiguration();
-
-                CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
-
-                cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
-            }
-
             if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
                 for (ClusterNode n : ctx.discovery().remoteNodes()) {
                     checkTransactionConfiguration(n);
@@ -696,9 +686,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             checkCache(locCfg, rmtCfg, n);
 
                             // Check plugin cache configurations.
-                            CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
-
-                            assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                            CachePluginManager pluginMgr = desc.pluginManager();
 
                             pluginMgr.validateRemotes(rmtCfg, n);
                         }
@@ -721,9 +709,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 if (filter.apply(locNode)) {
                     CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-                    CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
-
-                    assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                    CachePluginManager pluginMgr = desc.pluginManager();
 
                     GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
 
@@ -1657,6 +1643,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     if (existing == null) {
                         DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                            ctx,
                             ccfg,
                             req.cacheType(),
                             true,
@@ -1690,6 +1677,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         assert req.cacheType() != null : req;
 
                         DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                            ctx,
                             ccfg,
                             req.cacheType(),
                             false,
@@ -2039,7 +2027,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 if (desc == null) {
                     DynamicCacheDescriptor templateDesc =
-                        new DynamicCacheDescriptor(ccfg, req.cacheType(), true, req.deploymentId());
+                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId());
 
                     DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc);
 
@@ -2093,7 +2081,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     assert req.cacheType() != null : req;
 
                     DynamicCacheDescriptor startDesc =
-                        new DynamicCacheDescriptor(ccfg, req.cacheType(), false, req.deploymentId());
+                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
 
                     DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
 


[10/19] incubator-ignite git commit: # ignite-157-2

Posted by yz...@apache.org.
# ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/42563f6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/42563f6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/42563f6c

Branch: refs/heads/ignite-855
Commit: 42563f6c1b0f57e6a087c819df26893df5510ae9
Parents: 3394b4c
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 16:06:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 17:53:56 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/transactions/IgniteTxHandler.java | 5 +++++
 .../apache/ignite/testsuites/IgniteCacheRestartTestSuite.java   | 2 +-
 2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42563f6c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6843075..2897e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -53,6 +53,11 @@ public class IgniteTxHandler {
     /** Shared cache context. */
     private GridCacheSharedContext<?, ?> ctx;
 
+    /**
+     * @param nearNodeId Node ID.
+     * @param req Request.
+     * @return Prepare future.
+     */
     public IgniteInternalFuture<IgniteInternalTx> processNearTxPrepareRequest(final UUID nearNodeId,
         final GridNearTxPrepareRequest req) {
         return prepareTx(nearNodeId, null, req, null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42563f6c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index b219f7f..d620731 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -35,8 +35,8 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
 
         suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
 
-        suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
         // TODO: IGNITE-157.
+        // suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
         // suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
         // suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
 


[08/19] incubator-ignite git commit: # ignite-157-2

Posted by yz...@apache.org.
# ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/65099366
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/65099366
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/65099366

Branch: refs/heads/ignite-855
Commit: 65099366d767073de9a1e7a5d1b5ed67b4306fe8
Parents: fbf7149
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 10:35:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 13:50:49 2015 +0300

----------------------------------------------------------------------
 ...ridCacheOptimisticCheckPreparedTxFuture.java |  14 +-
 .../cache/transactions/IgniteTxManager.java     |  50 +++----
 ...ePrimaryNodeFailureRecoveryAbstractTest.java | 147 ++++++++++++++-----
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 -
 4 files changed, 150 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 3e345f4..bd3e1cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -163,6 +163,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 try {
                     cctx.io().send(nearNodeId, req, tx.ioPolicy());
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    fut.onNodeLeft();
+                }
                 catch (IgniteCheckedException e) {
                     fut.onError(e);
                 }
@@ -398,7 +401,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 cctx.tm().finishOptimisticTxOnRecovery(tx, res);
             }
             else {
-                if (nearTxCheck) {
+                if (err instanceof ClusterTopologyCheckedException && nearTxCheck) {
                     if (log.isDebugEnabled())
                         log.debug("Failed to check transaction on near node, " +
                             "ignoring [err=" + err + ", tx=" + tx + ']');
@@ -480,7 +483,14 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
             if (log.isDebugEnabled())
                 log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
 
-            onDone(true);
+            if (nearTxCheck) {
+                // Near and originating nodes left, need initiate tx check.
+                cctx.tm().commitIfPrepared(tx);
+
+                onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
+            }
+            else
+                onDone(true);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 19efc5d..85b3ad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2053,6 +2053,31 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Commits optimistic transaction in case when node started transaction failed, but all related
+     * transactions were prepared (invalidates transaction if it is not fully prepared).
+     *
+     * @param tx Transaction.
+     */
+    public void commitIfPrepared(IgniteInternalTx tx) {
+        assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote  : tx;
+        assert !F.isEmpty(tx.transactionNodes()) : tx;
+        assert tx.nearXidVersion() != null : tx;
+
+        GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
+            cctx,
+            tx,
+            tx.originatingNodeId(),
+            tx.transactionNodes());
+
+        cctx.mvcc().addFuture(fut);
+
+        if (log.isDebugEnabled())
+            log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
+
+        fut.prepare();
+    }
+
+    /**
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
@@ -2122,31 +2147,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 cctx.kernalContext().gateway().readUnlock();
             }
         }
-
-        /**
-         * Commits optimistic transaction in case when node started transaction failed, but all related
-         * transactions were prepared (invalidates transaction if it is not fully prepared).
-         *
-         * @param tx Transaction.
-         */
-        private void commitIfPrepared(IgniteInternalTx tx) {
-            assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote  : tx;
-            assert !F.isEmpty(tx.transactionNodes()) : tx;
-            assert tx.nearXidVersion() != null : tx;
-
-            GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
-                cctx,
-                tx,
-                evtNodeId,
-                tx.transactionNodes());
-
-            cctx.mvcc().addFuture(fut);
-
-            if (log.isDebugEnabled())
-                log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
-
-            fut.prepare();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 7a393d8..ee2f16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -149,7 +149,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
     }
 
     /**
-     * @param locBackupKey If {@code true} uses recovery for local backup key.
+     * @param locBackupKey If {@code true} uses one key which is backup for originating node.
      * @param rollback If {@code true} tests rollback after primary node failure.
      * @param optimistic If {@code true} tests optimistic transaction.
      * @throws Exception If failed.
@@ -177,6 +177,9 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         final Integer key1 = key0;
         final Integer key2 = primaryKey(cache2);
 
+        final Collection<ClusterNode> key1Nodes = aff.mapKeyToPrimaryAndBackups(key1);
+        final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
         TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
 
         IgniteTransactions txs = ignite(0).transactions();
@@ -225,8 +228,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    checkKey(key1, rollback);
-                    checkKey(key2, rollback);
+                    checkKey(key1, rollback ? null : key1Nodes);
+                    checkKey(key2, rollback ? null : key2Nodes);
 
                     return true;
                 }
@@ -238,51 +241,105 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             }
         }, 5000);
 
-        checkKey(key1, rollback);
-        checkKey(key2, rollback);
+        checkKey(key1, rollback ? null : key1Nodes);
+        checkKey(key2, rollback ? null : key2Nodes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
-        primaryAndOriginatingNodeFailure(false, true);
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
-        primaryAndOriginatingNodeFailure(true, true);
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
-        primaryAndOriginatingNodeFailure(false, false);
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
-        primaryAndOriginatingNodeFailure(true, false);
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, false, false);
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+        primaryAndOriginatingNodeFailure(false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+        primaryAndOriginatingNodeFailure(true, true, false);
+    }
+
+    /**
+     * @param locBackupKey If {@code true} uses one key which is backup for originating node.
      * @param rollback If {@code true} tests rollback after primary node failure.
      * @param optimistic If {@code true} tests optimistic transaction.
      * @throws Exception If failed.
      */
-    private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+    private void primaryAndOriginatingNodeFailure(final boolean locBackupKey,
+        final boolean rollback,
+        boolean optimistic)
+        throws Exception
+    {
         IgniteCache<Integer, Integer> cache0 = jcache(0);
-        IgniteCache<Integer, Integer> cache1 = jcache(1);
         IgniteCache<Integer, Integer> cache2 = jcache(2);
 
-        final Integer key1 = primaryKey(cache1);
+        Affinity<Integer> aff = ignite(0).affinity(null);
+
+        Integer key0 = null;
+
+        for (int key = 0; key < 10_000; key++) {
+            if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+                if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+                    key0 = key;
+
+                    break;
+                }
+            }
+        }
+
+        assertNotNull(key0);
+
+        final Integer key1 = key0;
         final Integer key2 = primaryKey(cache2);
 
+        int backups = cache0.getConfiguration(CacheConfiguration.class).getBackups();
+
+        final Collection<ClusterNode> key1Nodes =
+            (locBackupKey && backups < 2) ? null : aff.mapKeyToPrimaryAndBackups(key1);
+        final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
+
         TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
 
         IgniteTransactions txs = ignite(0).transactions();
@@ -326,12 +383,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    checkKey(key1, rollback);
-                    checkKey(key2, rollback);
+                    checkKey(key1, rollback ? null : key1Nodes);
+                    checkKey(key2, rollback ? null : key2Nodes);
 
                     return true;
-                }
-                catch (AssertionError e) {
+                } catch (AssertionError e) {
                     log.info("Check failed: " + e);
 
                     return false;
@@ -339,30 +395,53 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             }
         }, 5000);
 
-        checkKey(key1, rollback);
-        checkKey(key2, rollback);
+        checkKey(key1, rollback ? null : key1Nodes);
+        checkKey(key2, rollback ? null : key2Nodes);
     }
 
     /**
      * @param key Key.
-     * @param expNull {@code True} if {@code null} value is expected.
+     * @param keyNodes Key nodes.
      */
-    private void checkKey(Integer key, boolean expNull) {
-        Affinity<Integer> aff = ignite(2).affinity(null);
+    private void checkKey(Integer key, Collection<ClusterNode> keyNodes) {
+        if (keyNodes == null) {
+            for (Ignite ignite : G.allGrids()) {
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+            }
 
-        Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+            for (Ignite ignite : G.allGrids()) {
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
 
-        assertFalse(nodes.isEmpty());
+                assertNull("Unexpected value for: " + ignite.name(), cache.get(key));
+            }
+        }
+        else {
+            boolean found = false;
 
-        for (ClusterNode node : nodes) {
-            Ignite ignite = grid(node);
+            for (ClusterNode node : keyNodes) {
+                try {
+                    Ignite ignite = grid(node);
 
-            IgniteCache<Integer, Integer> cache = ignite.cache(null);
+                    found = true;
 
-            if (expNull)
-                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
-            else
-                assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+                    IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                    assertEquals("Unexpected value for: " + ignite.name(), key, key);
+                }
+                catch (IgniteIllegalStateException ignore) {
+                    // No-op.
+                }
+            }
+
+            assertTrue("Failed to find key node.", found);
+
+            for (Ignite ignite : G.allGrids()) {
+                IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+                assertEquals("Unexpected value for: " + ignite.name(), key, cache.get(key));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/65099366/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index bb019ae..28b10d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -353,9 +353,6 @@ public class IgniteCacheTestSuite extends TestSuite {
         // Iterators.
         suite.addTest(IgniteCacheIteratorsSelfTestSuite.suite());
 
-        // Add tx recovery test suite.
-        suite.addTest(IgniteCacheTxRecoverySelfTestSuite.suite());
-
         // Cache interceptor tests.
         suite.addTest(IgniteCacheInterceptorSelfTestSuite.suite());
 


[04/19] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2

Posted by yz...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b775f02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b775f02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b775f02

Branch: refs/heads/ignite-855
Commit: 4b775f02e2632fac72d44678f5ef13a04d9e1355
Parents: fc54ef7 18b4c39
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 11:56:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 11:56:18 2015 +0300

----------------------------------------------------------------------
 RELEASE_NOTES.txt                               |  13 +-
 .../spi/checkpoint/s3/S3CheckpointSpi.java      |   2 +-
 .../clients/src/test/resources/spring-cache.xml |   4 +-
 .../src/test/resources/spring-server-node.xml   |   4 +-
 .../test/resources/spring-server-ssl-node.xml   |   4 +-
 .../java/org/apache/ignite/IgniteLogger.java    |   8 +-
 .../java/org/apache/ignite/IgniteServices.java  |   2 +-
 .../main/java/org/apache/ignite/Ignition.java   |   2 +-
 .../eviction/sorted/SortedEvictionPolicy.java   |   2 +-
 .../configuration/ConnectorConfiguration.java   |   2 +-
 .../configuration/IgniteConfiguration.java      |   2 +-
 .../ignite/internal/GridJobContextImpl.java     |   7 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   2 +-
 .../internal/cluster/ClusterGroupAdapter.java   |  16 +
 .../managers/communication/GridIoManager.java   |   6 +-
 .../deployment/GridDeploymentManager.java       |   2 +-
 .../GridDeploymentPerVersionStore.java          |   3 +-
 .../cache/GridCacheSharedContext.java           |   2 +-
 .../processors/cache/IgniteCacheProxy.java      |  10 +-
 .../distributed/dht/GridDhtLockRequest.java     |  38 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  10 +-
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../distributed/near/GridNearLockRequest.java   |  54 +-
 .../ignite/internal/util/GridJavaProcess.java   |   4 +
 .../util/tostring/GridToStringBuilder.java      |   2 +-
 .../ignite/internal/visor/cache/VisorCache.java |  92 ++--
 .../visor/cache/VisorCacheConfiguration.java    |   7 -
 .../visor/cache/VisorCacheNodesTask.java        |  74 +++
 .../cache/VisorCacheStoreConfiguration.java     |  35 --
 .../cache/VisorCacheTypeFieldMetadata.java      |  36 +-
 .../visor/cache/VisorCacheTypeMetadata.java     |  99 +---
 .../internal/visor/igfs/VisorIgfsMetrics.java   | 128 +----
 .../visor/node/VisorBasicConfiguration.java     |  11 +
 .../visor/node/VisorNodeDataCollectorJob.java   |   2 +-
 .../node/VisorNodeEventsCollectorTask.java      |  58 +-
 .../internal/visor/query/VisorQueryArg.java     |  19 +-
 .../internal/visor/query/VisorQueryCursor.java  |   1 -
 .../internal/visor/query/VisorQueryJob.java     |   9 +-
 .../internal/visor/query/VisorQueryTask.java    |  41 --
 .../internal/visor/util/VisorEventMapper.java   |  13 +
 .../internal/visor/util/VisorTaskUtils.java     |   2 +-
 .../apache/ignite/logger/java/JavaLogger.java   |  12 +-
 .../apache/ignite/marshaller/Marshaller.java    |  14 +-
 .../ignite/marshaller/jdk/JdkMarshaller.java    |  10 +-
 .../optimized/OptimizedMarshaller.java          |   8 +-
 .../apache/ignite/resources/LoggerResource.java |   2 +-
 .../apache/ignite/resources/SpringResource.java |   2 +-
 .../org/apache/ignite/services/Service.java     |   2 +-
 .../ignite/services/ServiceConfiguration.java   |   2 +-
 .../checkpoint/cache/CacheCheckpointSpi.java    |   2 +-
 .../spi/checkpoint/jdbc/JdbcCheckpointSpi.java  |   2 +-
 .../sharedfs/SharedFsCheckpointSpi.java         |   4 +-
 .../fifoqueue/FifoQueueCollisionSpi.java        |  10 +-
 .../jobstealing/JobStealingCollisionSpi.java    |  14 +-
 .../PriorityQueueCollisionSpi.java              |   6 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  12 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  10 +-
 .../memory/MemoryEventStorageSpi.java           |  10 +-
 .../spi/failover/always/AlwaysFailoverSpi.java  |  10 +-
 .../jobstealing/JobStealingFailoverSpi.java     |   6 +-
 .../spi/failover/never/NeverFailoverSpi.java    |   8 +-
 .../apache/ignite/spi/indexing/IndexingSpi.java |   4 +-
 .../adaptive/AdaptiveLoadBalancingSpi.java      |  12 +-
 .../roundrobin/RoundRobinLoadBalancingSpi.java  |  10 +-
 .../WeightedRandomLoadBalancingSpi.java         |  10 +-
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |  10 +-
 .../src/test/config/load/merge-sort-base.xml    |   2 +-
 .../internal/GridDiscoveryEventSelfTest.java    |  30 +-
 ...ProjectionForCachesOnDaemonNodeSelfTest.java | 147 +++++
 .../distributed/GridCacheLockAbstractTest.java  |  75 +++
 .../DataStreamerMultiThreadedSelfTest.java      |   2 +
 .../logger/java/IgniteJavaLoggerTest.java       |  65 ---
 .../ignite/logger/java/JavaLoggerTest.java      |  65 +++
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |   5 +-
 .../junits/logger/GridTestLog4jLogger.java      |  10 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 .../testsuites/IgniteLoggingSelfTestSuite.java  |   2 +-
 .../processors/hadoop/HadoopProcessor.java      |   2 +-
 .../ignite/logger/jcl/IgniteJclLogger.java      | 167 ------
 .../org/apache/ignite/logger/jcl/JclLogger.java | 167 ++++++
 .../ignite/logger/jcl/IgniteJclLoggerTest.java  |  48 --
 .../apache/ignite/logger/jcl/JclLoggerTest.java |  48 ++
 .../ignite/testsuites/IgniteJclTestSuite.java   |   2 +-
 .../apache/ignite/logger/log4j/Log4JLogger.java |   8 +-
 .../ignite/logger/slf4j/GridSlf4jLogger.java    | 138 -----
 .../apache/ignite/logger/slf4j/Slf4jLogger.java | 138 +++++
 .../spi/deployment/uri/UriDeploymentSpi.java    |   2 +-
 .../ignite/visor/commands/VisorConsole.scala    | 314 ++++++-----
 .../visor/commands/VisorConsoleCommand.scala    |  77 ---
 .../ignite/visor/commands/VisorTextTable.scala  | 539 ------------------
 .../visor/commands/ack/VisorAckCommand.scala    |  42 +-
 .../commands/alert/VisorAlertCommand.scala      |  35 +-
 .../commands/cache/VisorCacheClearCommand.scala |  51 +-
 .../commands/cache/VisorCacheCommand.scala      |  34 +-
 .../commands/cache/VisorCacheScanCommand.scala  |  60 +-
 .../commands/cache/VisorCacheStopCommand.scala  |  30 +-
 .../commands/cache/VisorCacheSwapCommand.scala  |  66 +--
 .../commands/common/VisorConsoleCommand.scala   |  90 +++
 .../visor/commands/common/VisorTextTable.scala  | 543 +++++++++++++++++++
 .../config/VisorConfigurationCommand.scala      | 438 +++++++--------
 .../commands/deploy/VisorDeployCommand.scala    |  47 +-
 .../commands/disco/VisorDiscoveryCommand.scala  |  58 +-
 .../commands/events/VisorEventsCommand.scala    | 338 +++++-------
 .../visor/commands/gc/VisorGcCommand.scala      | 130 ++---
 .../visor/commands/kill/VisorKillCommand.scala  |  53 +-
 .../visor/commands/node/VisorNodeCommand.scala  |  47 +-
 .../visor/commands/ping/VisorPingCommand.scala  |  41 +-
 .../commands/start/VisorStartCommand.scala      |  34 +-
 .../commands/tasks/VisorTasksCommand.scala      |  76 +--
 .../commands/top/VisorTopologyCommand.scala     |  36 +-
 .../visor/commands/vvm/VisorVvmCommand.scala    |  32 +-
 .../scala/org/apache/ignite/visor/visor.scala   | 286 +++++++---
 .../ignite/visor/VisorTextTableSpec.scala       |   3 +-
 pom.xml                                         |   5 +-
 114 files changed, 2777 insertions(+), 2875 deletions(-)
----------------------------------------------------------------------



[06/19] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2

Posted by yz...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/45199ebf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/45199ebf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/45199ebf

Branch: refs/heads/ignite-855
Commit: 45199ebfc89763283226ea2824f877b1d043d36e
Parents: 5daaa27 47136b5
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 17:54:22 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 17:54:22 2015 +0300

----------------------------------------------------------------------
 modules/cloud/README.txt                        |  32 ++
 modules/cloud/licenses/apache-2.0.txt           | 202 +++++++++
 modules/cloud/pom.xml                           | 106 +++++
 .../cloud/TcpDiscoveryCloudIpFinder.java        | 433 +++++++++++++++++++
 .../tcp/ipfinder/cloud/package-info.java        |  21 +
 .../TcpDiscoveryCloudIpFinderSelfTest.java      | 124 ++++++
 .../tcp/ipfinder/cloud/package-info.java        |  22 +
 .../ignite/testsuites/IgniteCloudTestSuite.java | 112 +++++
 .../ignite/codegen/MessageCodeGenerator.java    |  26 +-
 .../configuration/CacheConfiguration.java       |   4 +-
 .../ignite/internal/GridDirectCollection.java   |   3 +
 .../ignite/internal/IgniteComponentType.java    |  36 +-
 .../managers/communication/GridIoManager.java   |  22 +-
 .../communication/GridIoMessageFactory.java     |   8 +
 .../managers/indexing/GridIndexingManager.java  |  14 +-
 .../processors/cache/CacheObjectImpl.java       |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |   9 +-
 .../processors/cache/GridCacheProcessor.java    |   3 +-
 .../processors/cache/GridCacheSwapManager.java  |  35 +-
 .../cache/query/GridCacheQueryManager.java      |  22 +-
 .../cache/query/GridCacheSqlQuery.java          | 135 +++++-
 .../cache/query/GridCacheTwoStepQuery.java      |   8 +-
 .../processors/query/GridQueryIndexing.java     |  19 +-
 .../processors/query/GridQueryProcessor.java    |  78 +++-
 .../messages/GridQueryNextPageResponse.java     |  68 ++-
 .../h2/twostep/messages/GridQueryRequest.java   |  21 +-
 .../util/spring/IgniteSpringHelper.java         |   4 +-
 .../internal/visor/cache/VisorCacheMetrics.java |  53 +--
 .../cache/VisorCacheNearConfiguration.java      |   4 +-
 .../visor/cache/VisorCacheStartTask.java        | 155 +++++++
 .../internal/visor/util/VisorTaskUtils.java     |  10 +
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   |  10 +-
 .../resources/META-INF/classnames.properties    |  13 +
 .../core/src/main/resources/ignite.properties   |   2 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +-
 modules/gce/README.txt                          |  32 ++
 modules/gce/licenses/apache-2.0.txt             | 202 +++++++++
 modules/gce/pom.xml                             |  92 ++++
 .../gce/TcpDiscoveryGoogleStorageIpFinder.java  | 380 ++++++++++++++++
 .../tcp/ipfinder/gce/package-info.java          |  22 +
 ...pDiscoveryGoogleStorageIpFinderSelfTest.java |  73 ++++
 .../tcp/ipfinder/gce/package-info.java          |  22 +
 .../ignite/testsuites/IgniteGCETestSuite.java   |  71 +++
 .../processors/query/h2/IgniteH2Indexing.java   | 169 +++++++-
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  92 +---
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |   7 +-
 .../query/h2/opt/GridH2KeyValueRowOnheap.java   |   6 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |  14 +-
 .../processors/query/h2/opt/GridH2Table.java    |  10 +-
 .../query/h2/opt/GridH2ValueCacheObject.java    | 191 ++++++++
 .../query/h2/opt/GridLuceneIndex.java           |  84 ++--
 .../query/h2/twostep/GridMapQueryExecutor.java  |  21 +-
 .../query/h2/twostep/GridMergeIndex.java        |   6 +-
 .../h2/twostep/GridMergeIndexUnsorted.java      |   4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  13 +-
 .../query/h2/twostep/GridResultPage.java        |  80 +++-
 .../query/h2/twostep/msg/GridH2Array.java       | 124 ++++++
 .../query/h2/twostep/msg/GridH2Boolean.java     | 112 +++++
 .../query/h2/twostep/msg/GridH2Byte.java        | 113 +++++
 .../query/h2/twostep/msg/GridH2Bytes.java       | 113 +++++
 .../query/h2/twostep/msg/GridH2CacheObject.java | 148 +++++++
 .../query/h2/twostep/msg/GridH2Date.java        | 115 +++++
 .../query/h2/twostep/msg/GridH2Decimal.java     | 134 ++++++
 .../query/h2/twostep/msg/GridH2Double.java      | 113 +++++
 .../query/h2/twostep/msg/GridH2Float.java       | 113 +++++
 .../query/h2/twostep/msg/GridH2Geometry.java    | 134 ++++++
 .../query/h2/twostep/msg/GridH2Integer.java     | 113 +++++
 .../query/h2/twostep/msg/GridH2JavaObject.java  | 113 +++++
 .../query/h2/twostep/msg/GridH2Long.java        | 113 +++++
 .../query/h2/twostep/msg/GridH2Null.java        |  78 ++++
 .../query/h2/twostep/msg/GridH2Short.java       | 113 +++++
 .../query/h2/twostep/msg/GridH2String.java      | 115 +++++
 .../query/h2/twostep/msg/GridH2Time.java        | 116 +++++
 .../query/h2/twostep/msg/GridH2Timestamp.java   | 133 ++++++
 .../query/h2/twostep/msg/GridH2Uuid.java        | 133 ++++++
 .../h2/twostep/msg/GridH2ValueMessage.java      |  49 +++
 .../twostep/msg/GridH2ValueMessageFactory.java  | 201 +++++++++
 .../IgniteCacheQueryMultiThreadedSelfTest.java  |   6 +-
 .../h2/GridIndexingSpiAbstractSelfTest.java     | 130 ++++--
 .../util/spring/IgniteSpringHelperImpl.java     |   2 +-
 ...gniteProjectionStartStopRestartSelfTest.java |  26 +-
 .../commands/cache/VisorCacheCommand.scala      |   2 +-
 pom.xml                                         |   5 +-
 83 files changed, 5687 insertions(+), 389 deletions(-)
----------------------------------------------------------------------



[11/19] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2

Posted by yz...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b141abfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b141abfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b141abfd

Branch: refs/heads/ignite-855
Commit: b141abfd5c4c4219d17e6a1cc1a7a1677b06b3c0
Parents: 42563f6 07a4258
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 09:12:56 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 09:12:56 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePartitionExchangeManager.java       | 3 +++
 .../cache/distributed/dht/GridCacheDhtPreloadSelfTest.java        | 2 +-
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[09/19] incubator-ignite git commit: # ignite-157-2

Posted by yz...@apache.org.
# ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3394b4c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3394b4c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3394b4c5

Branch: refs/heads/ignite-855
Commit: 3394b4c5aa8c557b94c8726ad5f27a01f2d76d32
Parents: 6509936
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 15:16:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 15:41:34 2015 +0300

----------------------------------------------------------------------
 .../GridCacheAbstractNodeRestartSelfTest.java   | 94 +++++++++++---------
 .../GridCachePartitionedNodeRestartTest.java    |  4 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |  4 +-
 .../GridCacheReplicatedNodeRestartSelfTest.java |  2 +
 .../testsuites/IgniteCacheRestartTestSuite.java |  6 +-
 5 files changed, 58 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 17d6e42..85e2c7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -77,10 +77,10 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
     private static volatile int idx = -1;
 
     /** Preload mode. */
-    protected CacheRebalanceMode preloadMode = ASYNC;
+    protected CacheRebalanceMode rebalancMode = ASYNC;
 
     /** */
-    protected int preloadBatchSize = DFLT_BATCH_SIZE;
+    protected int rebalancBatchSize = DFLT_BATCH_SIZE;
 
     /** Number of partitions. */
     protected int partitions = DFLT_PARTITIONS;
@@ -124,8 +124,8 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
     @Override protected void beforeTest() throws Exception {
         backups = DFLT_BACKUPS;
         partitions = DFLT_PARTITIONS;
-        preloadMode = ASYNC;
-        preloadBatchSize = DFLT_BATCH_SIZE;
+        rebalancMode = ASYNC;
+        rebalancBatchSize = DFLT_BATCH_SIZE;
         nodeCnt = DFLT_NODE_CNT;
         keyCnt = DFLT_KEY_CNT;
         retries = DFLT_RETRIES;
@@ -160,7 +160,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testRestart() throws Exception {
-        preloadMode = SYNC;
+        rebalancMode = SYNC;
         partitions = 3;
         nodeCnt = 2;
         keyCnt = 10;
@@ -240,7 +240,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 2;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 30000;
 
@@ -255,7 +255,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 2;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 30000;
 
@@ -270,7 +270,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 2;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 30000;
 
@@ -285,7 +285,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 2;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 30000;
 
@@ -300,7 +300,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 4;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 60000;
 
@@ -315,7 +315,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 4;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 60000;
 
@@ -330,7 +330,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 4;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 60000;
 
@@ -345,7 +345,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 4;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 60000;
 
@@ -360,7 +360,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 6;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -375,7 +375,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 6;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -390,7 +390,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 8;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -405,7 +405,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 8;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -420,7 +420,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 10;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -435,7 +435,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 10;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -450,7 +450,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 10;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -465,7 +465,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
         nodeCnt = 4;
         keyCnt = 10;
         partitions = 29;
-        preloadMode = ASYNC;
+        rebalancMode = ASYNC;
 
         long duration = 90000;
 
@@ -525,7 +525,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                         catch (Exception e) {
                             err.compareAndSet(null, e);
 
-                            error("Failed to put value in cache.", e);
+                            error("Unexpected exception in put-worker.", e);
                         }
                     }
                 }, "put-worker-" + i);
@@ -565,7 +565,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                         catch (Exception e) {
                             err.compareAndSet(null, e);
 
-                            error("Failed to restart grid node.", e);
+                            error("Unexpected exception in restart-worker.", e);
                         }
                     }
                 }, "restart-worker-" + i);
@@ -643,12 +643,16 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                                 int c = 0;
 
                                 try {
-                                    try (Transaction tx = ignite.transactions().txStart(txConcurrency(), REPEATABLE_READ)) {
+                                    IgniteTransactions txs = ignite.transactions();
+
+                                    try (Transaction tx = txs.txStart(txConcurrency(), REPEATABLE_READ)) {
                                         c = txCntr.incrementAndGet();
 
-                                        if (c % logFreq == 0)
-                                            info(">>> Tx iteration started [cnt=" + c + ", keys=" + keys + ", " +
-                                                "locNodeId=" + locNodeId + ']');
+                                        if (c % logFreq == 0) {
+                                            info(">>> Tx iteration started [cnt=" + c +
+                                                ", keys=" + keys +
+                                                ", locNodeId=" + locNodeId + ']');
+                                        }
 
                                         for (int key : keys) {
                                             int op = cacheOp();
@@ -664,17 +668,15 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                                         tx.commit();
                                     }
                                 }
-                                catch (ClusterTopologyException | CacheException e) {
-                                    if (e instanceof CacheException
-                                        && !(e.getCause() instanceof ClusterTopologyException))
-                                        throw e;
-
+                                catch (ClusterTopologyException | CacheException ignored) {
                                     // It is ok if primary node leaves grid.
                                 }
 
-                                if (c % logFreq == 0)
-                                    info(">>> Tx iteration finished [cnt=" + c + ", keys=" + keys + ", " +
-                                        "locNodeId=" + locNodeId + ']');
+                                if (c % logFreq == 0) {
+                                    info(">>> Tx iteration finished [cnt=" + c +
+                                        ", keys=" + keys +
+                                        ", locNodeId=" + locNodeId + ']');
+                                }
                             }
 
                             info(">>> " + Thread.currentThread().getName() + " finished.");
@@ -682,7 +684,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                         catch (Exception e) {
                             err.compareAndSet(null, e);
 
-                            error("Failed to put value in cache.", e);
+                            error("Unexpected exception in put-worker.", e);
                         }
                     }
                 }, "put-worker-" + i);
@@ -719,7 +721,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                         catch (Exception e) {
                             err.compareAndSet(null, e);
 
-                            error("Failed to restart grid node.", e);
+                            error("Unexpected exception in restart-worker.", e);
                         }
                     }
                 }, "restart-worker-" + i);
@@ -812,19 +814,21 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
 
                                     tx.commit();
                                 }
-                                catch (ClusterTopologyException ignored) {
+                                catch (ClusterTopologyException | CacheException ignored) {
                                     // It is ok if primary node leaves grid.
                                 }
 
-                                if (c % logFreq == 0)
-                                    info(">>> Tx iteration finished [cnt=" + c + ", keys=" + keys + ", " +
+                                if (c % logFreq == 0) {
+                                    info(">>> Tx iteration finished [cnt=" + c +
+                                        ", keys=" + keys + ", " +
                                         "locNodeId=" + locNodeId + ']');
+                                }
                             }
                         }
                         catch (Exception e) {
                             err.compareAndSet(null, e);
 
-                            error("Failed to put value in cache.", e);
+                            error("Unexpected exception in put-worker.", e);
                         }
                     }
                 }, "put-worker-" + i);
@@ -859,7 +863,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
                         catch (Exception e) {
                             err.compareAndSet(null, e);
 
-                            error("Failed to restart grid node.", e);
+                            error("Unexpected exception in restart-worker.", e);
                         }
                     }
                 }, "restart-worker-" + i);
@@ -893,10 +897,12 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
      * @param attempt Attempt.
      */
     private void printFailureDetails(IgniteCache<Integer, String> c, int key, int attempt) {
+        Ignite ignite = c.unwrap(Ignite.class);
+
         error("*** Failure details ***");
         error("Key: " + key);
-        error("Partition: " + c.getConfiguration(CacheConfiguration.class).getAffinity().partition(key));
+        error("Partition: " + ignite.affinity(c.getName()).partition(key));
         error("Attempt: " + attempt);
-        error("Node: " + c.unwrap(Ignite.class).cluster().localNode().id());
+        error("Node: " + ignite.cluster().localNode().id());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
index 8a2b19a..2096836 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
@@ -43,8 +43,8 @@ public class GridCachePartitionedNodeRestartTest extends GridCacheAbstractNodeRe
         cc.setWriteSynchronizationMode(FULL_ASYNC);
         cc.setNearConfiguration(null);
         cc.setStartSize(20);
-        cc.setRebalanceMode(preloadMode);
-        cc.setRebalanceBatchSize(preloadBatchSize);
+        cc.setRebalanceMode(rebalancMode);
+        cc.setRebalanceBatchSize(rebalancBatchSize);
         cc.setAffinity(new RendezvousAffinityFunction(false, partitions));
         cc.setBackups(backups);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
index 62dfaa9..82da2ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
@@ -42,8 +42,8 @@ public class GridCachePartitionedOptimisticTxNodeRestartTest extends GridCacheAb
         cc.setCacheMode(PARTITIONED);
         cc.setWriteSynchronizationMode(FULL_ASYNC);
         cc.setStartSize(20);
-        cc.setRebalanceMode(preloadMode);
-        cc.setRebalanceBatchSize(preloadBatchSize);
+        cc.setRebalanceMode(rebalancMode);
+        cc.setRebalanceBatchSize(rebalancBatchSize);
         cc.setAffinity(new RendezvousAffinityFunction(false, partitions));
         cc.setBackups(backups);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
index aa32559..0023160 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
@@ -34,6 +34,8 @@ public class GridCacheReplicatedNodeRestartSelfTest extends GridCacheAbstractNod
 
         CacheConfiguration cc = defaultCacheConfiguration();
 
+        cc.setNearConfiguration(null);
+
         cc.setAtomicityMode(atomicityMode());
 
         cc.setName(CACHE_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3394b4c5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index 0ced1c8..b219f7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -21,7 +21,6 @@ import junit.framework.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
 
 /**
  * In-Memory Data Grid stability test suite on changing topology.
@@ -36,10 +35,9 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
 
         suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
 
-        // TODO: GG-7419: Enable when fixed.
-        // suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+        suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+        // TODO: IGNITE-157.
         // suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
-        // TODO: uncomment when fix GG-1969
         // suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
 
         suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);


[14/19] incubator-ignite git commit: #gg-10186: NullPointerException at CacheDrStateTransferHandler.java:320 #gg-10187: NullPointerException at GridEntSecurityProcessor.java:263

Posted by yz...@apache.org.
#gg-10186: NullPointerException at CacheDrStateTransferHandler.java:320
#gg-10187: NullPointerException at GridEntSecurityProcessor.java:263


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/587103fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/587103fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/587103fd

Branch: refs/heads/ignite-855
Commit: 587103fdd1273e1d98897a07f98594dac85e38bc
Parents: 99c7e22
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed May 6 12:40:27 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed May 6 12:40:27 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 109 ++++++++++--------
 .../processors/cache/CacheGetFromJobTest.java   | 110 +++++++++++++++++++
 .../testsuites/IgniteCacheRestartTestSuite.java |   1 +
 3 files changed, 174 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c0026ab..d22d224 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Must use JDK marshaller since it is used by discovery to fire custom events. */
     private Marshaller marshaller = new JdkMarshaller();
 
+    /** Count down latch for caches. */
+    private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+
     /**
      * @param ctx Kernal context.
      */
@@ -657,87 +660,92 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
+        try {
+            if (ctx.config().isDaemon())
+                return;
 
-        ClusterNode locNode = ctx.discovery().localNode();
+            ClusterNode locNode = ctx.discovery().localNode();
 
-        // Init cache plugin managers.
-        final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
+            // Init cache plugin managers.
+            final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
 
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            CacheConfiguration locCcfg = desc.cacheConfiguration();
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                CacheConfiguration locCcfg = desc.cacheConfiguration();
 
-            CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
+                CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
 
-            cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
-        }
+                cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
+            }
 
-        if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
-            for (ClusterNode n : ctx.discovery().remoteNodes()) {
-                checkTransactionConfiguration(n);
+            if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+                for (ClusterNode n : ctx.discovery().remoteNodes()) {
+                    checkTransactionConfiguration(n);
 
-                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
-                DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+                    DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+                    DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 
-                CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
-                    locDepMode, rmtDepMode, true);
+                    CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+                        locDepMode, rmtDepMode, true);
 
-                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                    CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+                    for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                        CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
 
-                    if (rmtCfg != null) {
-                        CacheConfiguration locCfg = desc.cacheConfiguration();
+                        if (rmtCfg != null) {
+                            CacheConfiguration locCfg = desc.cacheConfiguration();
 
-                        checkCache(locCfg, rmtCfg, n);
+                            checkCache(locCfg, rmtCfg, n);
 
-                        // Check plugin cache configurations.
-                        CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+                            // Check plugin cache configurations.
+                            CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
 
-                        assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                            assert pluginMgr != null : " Map=" + cache2PluginMgr;
 
-                        pluginMgr.validateRemotes(rmtCfg, n);
+                            pluginMgr.validateRemotes(rmtCfg, n);
+                        }
                     }
                 }
             }
-        }
 
-        // Start dynamic caches received from collect discovery data.
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            boolean started = desc.onStart();
+            // Start dynamic caches received from collect discovery data.
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                boolean started = desc.onStart();
 
-            assert started : "Failed to change started flag for locally configured cache: " + desc;
+                assert started : "Failed to change started flag for locally configured cache: " + desc;
 
-            desc.clearRemoteConfigurations();
+                desc.clearRemoteConfigurations();
 
-            CacheConfiguration ccfg = desc.cacheConfiguration();
+                CacheConfiguration ccfg = desc.cacheConfiguration();
 
-            IgnitePredicate filter = ccfg.getNodeFilter();
+                IgnitePredicate filter = ccfg.getNodeFilter();
 
-            if (filter.apply(locNode)) {
-                CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+                if (filter.apply(locNode)) {
+                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-                CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+                    CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
 
-                assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                    assert pluginMgr != null : " Map=" + cache2PluginMgr;
 
-                GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+                    GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
 
-                ctx.dynamicDeploymentId(desc.deploymentId());
+                    ctx.dynamicDeploymentId(desc.deploymentId());
 
-                sharedCtx.addCacheContext(ctx);
+                    sharedCtx.addCacheContext(ctx);
 
-                GridCacheAdapter cache = ctx.cache();
+                    GridCacheAdapter cache = ctx.cache();
 
-                String name = ccfg.getName();
+                    String name = ccfg.getName();
 
-                caches.put(maskNull(name), cache);
+                    caches.put(maskNull(name), cache);
 
-                startCache(cache);
+                    startCache(cache);
 
-                jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+                    jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+                }
             }
         }
+        finally {
+            cacheStartedLatch.countDown();
+        }
 
         ctx.marshallerContext().onMarshallerCacheStarted(ctx);
 
@@ -835,6 +843,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStop(boolean cancel) {
+        cacheStartedLatch.countDown();
+
         if (ctx.config().isDaemon())
             return;
 
@@ -959,6 +969,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @throws IgniteCheckedException If failed to wait.
+     */
+    public void awaitStarted() throws IgniteCheckedException {
+        U.await(cacheStartedLatch);
+    }
+
+    /**
      * @param cache Cache.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
new file mode 100644
index 0000000..5859bec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ * Job tries to get cache during topology change.
+ */
+public class CacheGetFromJobTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyChange() throws Exception {
+        final AtomicReference<Exception> err = new AtomicReference<>();
+
+        final AtomicInteger id = new AtomicInteger(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+            @Override public void apply() {
+                info("Run topology change.");
+
+                try {
+                    for (int i = 0; i < 5; i++) {
+                        info("Topology change: " + i);
+
+                        startGrid(id.getAndIncrement());
+                    }
+                }
+                catch (Exception e) {
+                    err.set(e);
+
+                    log.error("Unexpected exception in topology-change-thread: " + e, e);
+                }
+            }
+        }, 3, "topology-change-thread");
+
+        int cntr = 0;
+
+        while (!fut.isDone()) {
+            grid(0).compute().broadcast(new TestJob());
+
+            cntr++;
+        }
+
+        log.info("Job execution count: " + cntr);
+
+        Exception err0 = err.get();
+
+        if (err0 != null)
+            throw err0;
+    }
+
+    /**
+     * Test job.
+     */
+    private static class TestJob implements IgniteCallable<Object> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        public TestJob() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            IgniteCache cache = ignite.cache(null);
+
+            assertNotNull(cache);
+
+            assertEquals(0, cache.localSize());
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/587103fd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index 0ced1c8..796d138 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -48,6 +48,7 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheAtomicPutAllFailoverSelfTest.class);
         suite.addTestSuite(IgniteCachePutAllRestartTest.class);
         suite.addTestSuite(GridCachePutAllFailoverSelfTest.class);
+        suite.addTestSuite(CacheGetFromJobTest.class);
 
         return suite;
     }


[02/19] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-157-2' into ignite-157-2

Posted by yz...@apache.org.
Merge remote-tracking branch 'origin/ignite-157-2' into ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e0810ed8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e0810ed8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e0810ed8

Branch: refs/heads/ignite-855
Commit: e0810ed828ab802e96cbccf65078b71faf674968
Parents: d14a0fb 2229d74
Author: sboikov <sb...@gridgain.com>
Authored: Tue Apr 28 09:17:24 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Apr 28 09:17:24 2015 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/ignite/Ignition.java   |  44 ++++
 .../org/apache/ignite/internal/IgnitionEx.java  | 165 +++++++++++++-
 .../processors/cache/CacheInvokeResult.java     |  24 +-
 .../processors/cache/GridCacheAdapter.java      |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |   4 +-
 .../processors/cache/GridCacheReturn.java       |   5 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   3 +-
 .../local/atomic/GridLocalAtomicCache.java      |   6 +-
 .../util/spring/IgniteSpringHelper.java         |  54 ++++-
 .../DataStreamerMultiThreadedSelfTest.java      |  19 +-
 .../GridSwapSpaceSpiAbstractSelfTest.java       |   2 +-
 .../query/h2/sql/BaseH2CompareQueryTest.java    |  32 +--
 .../util/spring/IgniteSpringHelperImpl.java     | 217 +++++++++++++++----
 .../IgniteStartFromStreamConfigurationTest.java |  50 +++++
 .../testsuites/IgniteSpringTestSuite.java       |   2 +
 15 files changed, 548 insertions(+), 81 deletions(-)
----------------------------------------------------------------------



[03/19] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-157-2' into ignite-157-2

Posted by yz...@apache.org.
Merge remote-tracking branch 'origin/ignite-157-2' into ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fc54ef7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fc54ef7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fc54ef7a

Branch: refs/heads/ignite-855
Commit: fc54ef7a10cd6e28152b19d21de320d47a5c3e3c
Parents: e0810ed 2ea83ce
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 11:55:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 11:55:59 2015 +0300

----------------------------------------------------------------------
 .../ignite/testsuites/IgniteCacheFailoverTestSuite.java       | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[15/19] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-157-2' into ignite-sprint-4

Posted by yz...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-157-2' into ignite-sprint-4


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ba210bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ba210bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ba210bbb

Branch: refs/heads/ignite-855
Commit: ba210bbbfec66f4ff8913550e1e3b43ba65cf0e1
Parents: 99c7e22 f5f95fb
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 6 12:41:02 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 6 12:41:02 2015 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   4 +-
 .../communication/GridIoMessageFactory.java     |   4 +-
 ...ridCacheOptimisticCheckPreparedTxFuture.java | 434 ---------------
 ...idCacheOptimisticCheckPreparedTxRequest.java | 232 --------
 ...dCacheOptimisticCheckPreparedTxResponse.java | 179 -------
 .../distributed/GridCacheTxRecoveryFuture.java  | 506 ++++++++++++++++++
 .../distributed/GridCacheTxRecoveryRequest.java | 261 +++++++++
 .../GridCacheTxRecoveryResponse.java            | 182 +++++++
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../cache/transactions/IgniteInternalTx.java    |   5 +-
 .../cache/transactions/IgniteTxAdapter.java     |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |  38 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 .../cache/transactions/IgniteTxManager.java     | 173 ++----
 .../resources/META-INF/classnames.properties    |   6 +-
 .../GridCacheAbstractFailoverSelfTest.java      |   4 +-
 .../GridCacheAbstractNodeRestartSelfTest.java   |  94 ++--
 ...xOriginatingNodeFailureAbstractSelfTest.java |   2 +-
 ...rDisabledPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...rtitionedPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...woBackupsPrimaryNodeFailureRecoveryTest.java |  37 ++
 ...ePrimaryNodeFailureRecoveryAbstractTest.java | 533 +++++++++++++++++++
 .../GridCachePartitionedNodeRestartTest.java    |   4 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |   4 +-
 .../GridCacheReplicatedNodeRestartSelfTest.java |   2 +
 .../testsuites/IgniteCacheRestartTestSuite.java |   4 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 -
 .../IgniteCacheTxRecoverySelfTestSuite.java     |   4 +
 28 files changed, 1737 insertions(+), 1046 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba210bbb/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------


[07/19] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2

Posted by yz...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-157-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fbf7149d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fbf7149d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fbf7149d

Branch: refs/heads/ignite-855
Commit: fbf7149d775472f9dc1e9cba9a0c10a6780688e6
Parents: 45199eb 54f9492
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 5 09:56:03 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 5 09:56:03 2015 +0300

----------------------------------------------------------------------
 DEVNOTES.txt                                    |   4 +-
 assembly/release-base.xml                       |   2 +
 assembly/release-schema-import.xml              |  50 +++
 .../streaming/wordcount/CacheConfig.java        |   5 -
 .../config/grid-client-config.properties        |  50 +--
 .../ClientPropertiesConfigurationSelfTest.java  |  12 +-
 .../java/org/apache/ignite/IgniteCache.java     |   5 +
 .../org/apache/ignite/IgniteJdbcDriver.java     |  81 +++--
 .../configuration/CacheConfiguration.java       | 255 +++++++++++---
 .../configuration/IgniteConfiguration.java      | 344 +++++++++++++++----
 .../ignite/internal/GridUpdateNotifier.java     |  66 +++-
 .../apache/ignite/internal/IgniteKernal.java    |  83 +++--
 .../org/apache/ignite/internal/IgnitionEx.java  |  15 +-
 .../client/GridClientConfiguration.java         |   2 +-
 .../managers/communication/GridIoManager.java   |   8 +-
 .../processors/cache/GridCacheTtlManager.java   | 168 +++++----
 .../processors/cache/GridCacheUtils.java        |   5 +-
 .../apache/ignite/lang/IgniteAsyncSupport.java  |   4 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  19 +-
 .../discovery/tcp/TcpClientDiscoverySpi.java    |   4 -
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   4 -
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   |   8 +-
 .../internal/GridUpdateNotifierSelfTest.java    |  13 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |   4 +-
 ...CacheLoadingConcurrentGridStartSelfTest.java | 154 +++++++++
 ...GridCacheLoadingConcurrentGridStartTest.java | 154 ---------
 .../tcp/TcpClientDiscoverySelfTest.java         |   8 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +-
 modules/schema-import/pom.xml                   |   6 +-
 pom.xml                                         | 227 ++++++++++--
 30 files changed, 1225 insertions(+), 537 deletions(-)
----------------------------------------------------------------------



[05/19] incubator-ignite git commit: # ignite-157-2 Tests and fix for tx recovery issue

Posted by yz...@apache.org.
# ignite-157-2 Tests and fix for tx recovery issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5daaa278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5daaa278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5daaa278

Branch: refs/heads/ignite-855
Commit: 5daaa278afcca7e00be5002e3d5247661c6ba474
Parents: 4b775f0
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 12:48:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 17:13:48 2015 +0300

----------------------------------------------------------------------
 ...ridCacheOptimisticCheckPreparedTxFuture.java |  78 +++-
 ...idCacheOptimisticCheckPreparedTxRequest.java |  47 +-
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../cache/transactions/IgniteInternalTx.java    |   5 +-
 .../cache/transactions/IgniteTxAdapter.java     |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |   3 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 .../cache/transactions/IgniteTxManager.java     |  47 +-
 ...xOriginatingNodeFailureAbstractSelfTest.java |   2 +-
 ...rDisabledPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...rtitionedPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...woBackupsPrimaryNodeFailureRecoveryTest.java |  37 ++
 ...ePrimaryNodeFailureRecoveryAbstractTest.java | 454 +++++++++++++++++++
 .../IgniteCacheTxRecoverySelfTestSuite.java     |   4 +
 14 files changed, 713 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 8a14b48..3e345f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -70,6 +70,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
     /** Transaction nodes mapping. */
     private final Map<UUID, Collection<UUID>> txNodes;
 
+    /** */
+    private final boolean nearTxCheck;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -77,8 +80,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
      * @param txNodes Transaction mapping.
      */
     @SuppressWarnings("ConstantConditions")
-    public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx, IgniteInternalTx tx,
-        UUID failedNodeId, Map<UUID, Collection<UUID>> txNodes) {
+    public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx,
+        IgniteInternalTx tx,
+        UUID failedNodeId,
+        Map<UUID, Collection<UUID>> txNodes)
+    {
         super(cctx.kernalContext(), CU.boolReducer());
 
         this.cctx = cctx;
@@ -114,6 +120,10 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 }
             }
         }
+
+        UUID nearNodeId = tx.eventNodeId();
+
+        nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
     }
 
     /**
@@ -121,6 +131,48 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
      */
     @SuppressWarnings("ConstantConditions")
     public void prepare() {
+        if (nearTxCheck) {
+            UUID nearNodeId = tx.eventNodeId();
+
+            if (cctx.localNodeId().equals(nearNodeId)) {
+                IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                        try {
+                            onDone(fut.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+            }
+            else {
+                MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+                add(fut);
+
+                GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
+                    tx,
+                    0,
+                    true,
+                    futureId(),
+                    fut.futureId());
+
+                try {
+                    cctx.io().send(nearNodeId, req, tx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onError(e);
+                }
+
+                markInitialized();
+            }
+
+            return;
+        }
+
         // First check transactions on local node.
         int locTxNum = nodeTransactions(cctx.localNodeId());
 
@@ -206,6 +258,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
 
                     GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
                         nodeTransactions(id),
+                        false,
                         futureId(),
                         fut.futureId());
 
@@ -228,7 +281,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 add(fut);
 
                 GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
-                    tx, nodeTransactions(nodeId), futureId(), fut.futureId());
+                    tx,
+                    nodeTransactions(nodeId),
+                    false,
+                    futureId(),
+                    fut.futureId());
 
                 try {
                     cctx.io().send(nodeId, req, tx.ioPolicy());
@@ -341,11 +398,18 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 cctx.tm().finishOptimisticTxOnRecovery(tx, res);
             }
             else {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to check prepared transactions, " +
-                        "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+                if (nearTxCheck) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to check transaction on near node, " +
+                            "ignoring [err=" + err + ", tx=" + tx + ']');
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to check prepared transactions, " +
+                            "invalidating transaction [err=" + err + ", tx=" + tx + ']');
 
-                cctx.tm().salvageTx(tx);
+                    cctx.tm().salvageTx(tx);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index e83db66..4f2a1d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@ -27,8 +27,7 @@ import java.io.*;
 import java.nio.*;
 
 /**
- * Message sent to check that transactions related to some optimistic transaction
- * were prepared on remote node.
+ * Message sent to check that transactions related to transaction were prepared on remote node.
  */
 public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
     /** */
@@ -49,6 +48,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
     /** System transaction flag. */
     private boolean sys;
 
+    /** {@code True} if should check only tx on near node. */
+    private boolean nearTxCheck;
+
     /**
      * Empty constructor required by {@link Externalizable}
      */
@@ -59,11 +61,16 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
     /**
      * @param tx Transaction.
      * @param txNum Expected number of transactions on remote node.
+     * @param nearTxCheck
      * @param futId Future ID.
      * @param miniId Mini future ID.
      */
-    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, int txNum, IgniteUuid futId,
-        IgniteUuid miniId) {
+    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx,
+        int txNum,
+        boolean nearTxCheck,
+        IgniteUuid futId,
+        IgniteUuid miniId)
+    {
         super(tx.xidVersion(), 0);
 
         nearXidVer = tx.nearXidVersion();
@@ -72,6 +79,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
         this.futId = futId;
         this.miniId = miniId;
         this.txNum = txNum;
+        this.nearTxCheck = nearTxCheck;
+    }
+
+    /**
+     * @return {@code True} if should check only tx on near node.
+     */
+    public boolean nearTxCheck() {
+        return nearTxCheck;
     }
 
     /**
@@ -137,18 +152,24 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("sys", sys))
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeBoolean("sys", sys))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeInt("txNum", txNum))
                     return false;
 
@@ -187,7 +208,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 reader.incrementState();
 
             case 10:
-                nearXidVer = reader.readMessage("nearXidVer");
+                nearTxCheck = reader.readBoolean("nearTxCheck");
 
                 if (!reader.isLastRead())
                     return false;
@@ -195,7 +216,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 reader.incrementState();
 
             case 11:
-                sys = reader.readBoolean("sys");
+                nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -203,6 +224,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 reader.incrementState();
 
             case 12:
+                sys = reader.readBoolean("sys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 txNum = reader.readInt("txNum");
 
                 if (!reader.isLastRead())
@@ -222,7 +251,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 5c75390..3215138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -206,7 +206,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
+    @Override public GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
         boolean failFast,
         KeyCacheObject key,
         CacheEntryPredicate[] filter)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 30367c5..8dc07cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -338,8 +338,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
 
     /**
      * Gets node ID which directly started this transaction. In case of DHT local transaction it will be
-     * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote
-     * transaction it will be starter node ID.
+     * near node ID, in case of DHT remote transaction it will be primary node ID.
      *
      * @return Originating node ID.
      */
@@ -507,7 +506,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
      * @return Current value for the key within transaction.
      * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}.
      */
-     @Nullable public <K, V> GridTuple<CacheObject> peek(
+     @Nullable public GridTuple<CacheObject> peek(
          GridCacheContext ctx,
          boolean failFast,
          KeyCacheObject key,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1c02356..82d68b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1964,7 +1964,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext ctx,
+        @Nullable @Override public GridTuple<CacheObject> peek(GridCacheContext ctx,
             boolean failFast,
             KeyCacheObject key,
             @Nullable CacheEntryPredicate[] filter) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6b45fee..6843075 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1183,7 +1183,8 @@ public class IgniteTxHandler {
         if (log.isDebugEnabled())
             log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
 
-        IgniteInternalFuture<Boolean> fut = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+        IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) :
+            ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
 
         if (fut == null || fut.isDone()) {
             boolean prepared;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dfce09c..fc3efba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -330,7 +330,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
     /** {@inheritDoc} */
     @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable @Override public <K, V> GridTuple<CacheObject> peek(
+    @Nullable @Override public GridTuple<CacheObject> peek(
         GridCacheContext cacheCtx,
         boolean failFast,
         KeyCacheObject key,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c494602..19efc5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -727,14 +727,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param txId Transaction ID.
-     * @return Transaction with given ID.
-     */
-    @Nullable public IgniteInternalTx txx(GridCacheVersion txId) {
-        return idMap.get(txId);
-    }
-
-    /**
      * Handles prepare stage of 2PC.
      *
      * @param tx Transaction to prepare.
@@ -1770,6 +1762,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param ver Version.
+     * @return Future for flag indicating if transactions was committed.
+     */
+    public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion ver) {
+        final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>();
+
+        final IgniteInternalTx tx = cctx.tm().tx(ver);
+
+        if (tx != null) {
+            assert tx.near() && tx.local() : tx;
+
+            if (log.isDebugEnabled())
+                log.debug("Found near transaction, will wait for completion: " + tx);
+
+            tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                    TransactionState state = tx.state();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Near transaction finished with state: " + state);
+
+                    resFut.onDone(state == COMMITTED);
+                }
+            });
+
+            return resFut;
+        }
+
+        Boolean committed = completedVers.get(ver);
+
+        if (log.isDebugEnabled())
+            log.debug("Near transaction committed: " + committed);
+
+        resFut.onDone(committed != null && committed);
+
+        return resFut;
+    }
+
+    /**
      * @param nearVer Near version ID.
      * @param txNum Number of transactions.
      * @param fut Result future.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 00bd43f..d664aa8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -156,7 +156,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
 
                 TransactionProxyImpl tx = (TransactionProxyImpl)txIgniteNode.transactions().txStart();
 
-                IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx");
+                IgniteInternalTx txEx = tx.tx();
 
                 assertTrue(txEx.optimistic());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..62d9b79
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest
+    extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..a40c989
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedPrimaryNodeFailureRecoveryTest
+    extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..70eef1d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
+    extends IgniteCachePartitionedPrimaryNodeFailureRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        assertEquals(1, ccfg.getBackups());
+
+        ccfg.setBackups(2);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
new file mode 100644
index 0000000..7a393d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.transactions.TransactionState.*;
+
+/**
+ *
+ */
+public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends IgniteCacheAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception {
+        primaryNodeFailure(false, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception {
+        primaryNodeFailure(true, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRollback1() throws Exception {
+        primaryNodeFailure(false, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRollback2() throws Exception {
+        primaryNodeFailure(true, true, true);
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRecovery1() throws Exception {
+        primaryNodeFailure(false, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRecovery2() throws Exception {
+        primaryNodeFailure(true, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRollback1() throws Exception {
+        primaryNodeFailure(false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRollback2() throws Exception {
+        primaryNodeFailure(true, true, false);
+    }
+
+    /**
+     * @param locBackupKey If {@code true} uses recovery for local backup key.
+     * @param rollback If {@code true} tests rollback after primary node failure.
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void primaryNodeFailure(boolean locBackupKey, final boolean rollback, boolean optimistic) throws Exception {
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
+        IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+        Affinity<Integer> aff = ignite(0).affinity(null);
+
+        Integer key0 = null;
+
+        for (int key = 0; key < 10_000; key++) {
+            if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+                if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+                    key0 = key;
+
+                    break;
+                }
+            }
+        }
+
+        assertNotNull(key0);
+
+        final Integer key1 = key0;
+        final Integer key2 = primaryKey(cache2);
+
+        TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+        IgniteTransactions txs = ignite(0).transactions();
+
+        try (Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ)) {
+            log.info("Put key1: " + key1);
+
+            cache0.put(key1, key1);
+
+            log.info("Put key2: " + key2);
+
+            cache0.put(key2, key2);
+
+            log.info("Start prepare.");
+
+            IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+            commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+            IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+            waitPrepared(ignite(1));
+
+            log.info("Stop one primary node.");
+
+            stopGrid(1);
+
+            U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+            commSpi.stopBlock();
+
+            prepFut.get(10_000);
+
+            if (rollback) {
+                log.info("Rollback.");
+
+                tx.rollback();
+            }
+            else {
+                log.info("Commit.");
+
+                tx.commit();
+            }
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    checkKey(key1, rollback);
+                    checkKey(key2, rollback);
+
+                    return true;
+                }
+                catch (AssertionError e) {
+                    log.info("Check failed: " + e);
+
+                    return false;
+                }
+            }
+        }, 5000);
+
+        checkKey(key1, rollback);
+        checkKey(key2, rollback);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+        primaryAndOriginatingNodeFailure(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+        primaryAndOriginatingNodeFailure(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+        primaryAndOriginatingNodeFailure(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+        primaryAndOriginatingNodeFailure(true, false);
+    }
+
+    /**
+     * @param rollback If {@code true} tests rollback after primary node failure.
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
+        IgniteCache<Integer, Integer> cache1 = jcache(1);
+        IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+        final Integer key1 = primaryKey(cache1);
+        final Integer key2 = primaryKey(cache2);
+
+        TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+        IgniteTransactions txs = ignite(0).transactions();
+
+        Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
+
+        log.info("Put key1: " + key1);
+
+        cache0.put(key1, key1);
+
+        log.info("Put key2: " + key2);
+
+        cache0.put(key2, key2);
+
+        log.info("Start prepare.");
+
+        IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+        commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+        IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+        waitPrepared(ignite(1));
+
+        log.info("Stop one primary node.");
+
+        stopGrid(1);
+
+        U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+        if (!rollback) {
+            commSpi.stopBlock();
+
+            prepFut.get(10_000);
+        }
+
+        log.info("Stop originating node.");
+
+        stopGrid(0);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    checkKey(key1, rollback);
+                    checkKey(key2, rollback);
+
+                    return true;
+                }
+                catch (AssertionError e) {
+                    log.info("Check failed: " + e);
+
+                    return false;
+                }
+            }
+        }, 5000);
+
+        checkKey(key1, rollback);
+        checkKey(key2, rollback);
+    }
+
+    /**
+     * @param key Key.
+     * @param expNull {@code True} if {@code null} value is expected.
+     */
+    private void checkKey(Integer key, boolean expNull) {
+        Affinity<Integer> aff = ignite(2).affinity(null);
+
+        Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+        assertFalse(nodes.isEmpty());
+
+        for (ClusterNode node : nodes) {
+            Ignite ignite = grid(node);
+
+            IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+            if (expNull)
+                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+            else
+                assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @throws Exception If failed.
+     */
+    private void waitPrepared(Ignite ignite) throws Exception {
+        final IgniteTxManager tm = ((IgniteKernal)ignite).context().cache().context().tm();
+
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                GridDhtTxLocal locTx = null;
+
+                for (IgniteInternalTx tx : tm.txs()) {
+                    if (tx instanceof GridDhtTxLocal) {
+                        assertNull("Only one tx is expected.", locTx);
+
+                        locTx = (GridDhtTxLocal)tx;
+                    }
+                }
+
+                log.info("Wait for tx, state: " + (locTx != null ? locTx.state() : null));
+
+                return locTx != null && locTx.state() == PREPARED;
+            }
+        }, 5000);
+
+        assertTrue("Failed to wait for tx.", wait);
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private UUID blockNodeId;
+
+        /** */
+        private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Object msg0 = ((GridIoMessage)msg).message();
+
+                if (msg0 instanceof GridNearTxPrepareRequest) {
+                    synchronized (this) {
+                        if (blockNodeId != null && blockNodeId.equals(node.id())) {
+                            log.info("Block message: " + msg0);
+
+                            blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+                            return;
+                        }
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         */
+        void blockMessages(UUID nodeId) {
+            blockNodeId = nodeId;
+        }
+
+        /**
+         *
+         */
+        void stopBlock() {
+            synchronized (this) {
+                blockNodeId = null;
+
+                for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+                    log.info("Send blocked message: " + msg.get2().message());
+
+                    super.sendMessage(msg.get1(), msg.get2());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index e832099..1bd0e5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -33,6 +33,10 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Cache tx recovery test suite");
 
+        suite.addTestSuite(IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);
+
         suite.addTestSuite(GridCachePartitionedTxOriginatingNodeFailureSelfTest.class);
         suite.addTestSuite(GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedTxOriginatingNodeFailureSelfTest.class);