You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/14 15:59:14 UTC

ignite git commit: ignite-1561 Disable onePhase commit for tx with near cache

Repository: ignite
Updated Branches:
  refs/heads/ignite-1561-1 [created] ead80c01d


ignite-1561 Disable onePhase commit for tx with near cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ead80c01
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ead80c01
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ead80c01

Branch: refs/heads/ignite-1561-1
Commit: ead80c01daa607fdb4c1e4166607053d18d4875c
Parents: 63001f1
Author: sboikov <sb...@gridgain.com>
Authored: Fri Apr 14 14:56:08 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Apr 14 18:03:44 2017 +0300

----------------------------------------------------------------------
 .../distributed/GridDistributedTxMapping.java   | 41 +++++++++++++-------
 ...arOptimisticSerializableTxPrepareFuture.java |  9 ++---
 .../near/GridNearOptimisticTxPrepareFuture.java | 22 ++++++-----
 .../GridNearPessimisticTxPrepareFuture.java     | 22 ++++++-----
 .../near/GridNearTxFinishFuture.java            |  6 +--
 .../cache/distributed/near/GridNearTxLocal.java | 15 +++----
 .../near/GridNearTxPrepareFutureAdapter.java    |  4 +-
 .../IgniteTxImplicitSingleStateImpl.java        |  6 +++
 .../transactions/IgniteTxLocalAdapter.java      |  2 +-
 .../cache/transactions/IgniteTxLocalState.java  | 10 +++++
 .../cache/transactions/IgniteTxState.java       |  2 +-
 .../cache/transactions/IgniteTxStateImpl.java   | 18 +++++++++
 .../dht/IgniteCrossCacheTxSelfTest.java         |  2 +-
 13 files changed, 104 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index f8cec50..9d86244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
+import java.util.List;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -52,12 +54,12 @@ public class GridDistributedTxMapping {
     /** {@code True} if this is last mapping for node. */
     private boolean last;
 
-    /** {@code True} if mapping is for near caches, {@code false} otherwise. */
-    private boolean near;
-
     /** {@code True} if this is first mapping for optimistic tx on client node. */
     private boolean clientFirst;
 
+    /** */
+    private boolean hasNear;
+
     /**
      * @param primary Primary node.
      */
@@ -95,18 +97,8 @@ public class GridDistributedTxMapping {
         this.clientFirst = clientFirst;
     }
 
-    /**
-     * @return {@code True} if mapping is for near caches, {@code false} otherwise.
-     */
-    public boolean near() {
-        return near;
-    }
-
-    /**
-     * @param near {@code True} if mapping is for near caches, {@code false} otherwise.
-     */
-    public void near(boolean near) {
-        this.near = near;
+    public boolean hasNearCacheEntries() {
+        return hasNear;
     }
 
     /**
@@ -124,6 +116,22 @@ public class GridDistributedTxMapping {
     }
 
     /**
+     * @return Near cache entries.
+     */
+    @Nullable public List<IgniteTxEntry> nearCacheEntries() {
+        assert hasNear;
+
+        List<IgniteTxEntry> nearCacheEntries = new ArrayList<>();
+
+        for (IgniteTxEntry e : entries) {
+            if (e.context().isNear())
+                nearCacheEntries.add(e);
+        }
+
+        return nearCacheEntries;
+    }
+
+    /**
      * @return {@code True} if lock is explicit.
      */
     public boolean explicitLock() {
@@ -173,6 +181,9 @@ public class GridDistributedTxMapping {
      * @param entry Adds entry.
      */
     public void add(IgniteTxEntry entry) {
+        if (entry.context().isNear())
+            hasNear = true;
+
         entries.add(entry);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index cbd9d23..a54c65d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -441,7 +441,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             timeout,
             m.reads(),
             m.writes(),
-            m.near(),
+            m.hasNearCacheEntries(),
             txMapping.transactionNodes(),
             m.last(),
             tx.onePhaseCommit(),
@@ -459,9 +459,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         }
 
         // Must lock near entries separately.
-        if (m.near()) {
+        if (m.hasNearCacheEntries()) {
             try {
-                tx.optimisticLockEntries(m.entries());
+                tx.optimisticLockEntries(m.nearCacheEntries());
 
                 tx.userPrepare();
             }
@@ -581,9 +581,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                     tx.colocatedLocallyMapped(true);
             }
 
-            // Initialize near flag right away.
-            cur.near(cacheCtx.isNear());
-
             cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
 
             cur.last(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index bc47c13..26af91a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -382,7 +382,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!mapping.hasNearCacheEntries())
+            checkOnePhase(txMapping);
 
         proceedPrepare(mapping, null);
     }
@@ -409,11 +410,16 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
 
+        boolean hasNearCache = false;
+
         for (IgniteTxEntry write : writes) {
             write.clearEntryReadVersion();
 
             GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap);
 
+            if (updated.hasNearCacheEntries())
+                hasNearCache = true;
+
             if (cur != updated) {
                 mappings.offer(updated);
 
@@ -451,7 +457,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!hasNearCache)
+            checkOnePhase(txMapping);
 
         proceedPrepare(mappings);
     }
@@ -497,7 +504,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     timeout,
                     null,
                     m.writes(),
-                    m.near(),
+                    m.hasNearCacheEntries(),
                     txMapping.transactionNodes(),
                     m.last(),
                     tx.onePhaseCommit(),
@@ -515,9 +522,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 }
 
                 // Must lock near entries separately.
-                if (m.near()) {
+                if (m.hasNearCacheEntries()) {
                     try {
-                        tx.optimisticLockEntries(req.writes());
+                        tx.optimisticLockEntries(m.nearCacheEntries());
 
                         tx.userPrepare();
                     }
@@ -644,14 +651,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             }
         }
 
-        if (cur == null || !cur.primary().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+        if (cur == null || !cur.primary().id().equals(primary.id())) {
             boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode();
 
             cur = new GridDistributedTxMapping(primary);
 
-            // Initialize near flag right away.
-            cur.near(cacheCtx.isNear());
-
             cur.clientFirst(clientFirst);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index cb15bca..f6e7d2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -184,12 +184,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
      *
      */
     private void preparePessimistic() {
-        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+        Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
 
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
         GridDhtTxMapping txMapping = new GridDhtTxMapping();
 
+        boolean hasNearCache = false;
+
         for (IgniteTxEntry txEntry : tx.allEntries()) {
             txEntry.clearEntryReadVersion();
 
@@ -205,20 +207,19 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             else
                 nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
 
-            ClusterNode primary = F.first(nodes);
+            assert !nodes.isEmpty();
 
-            boolean near = cacheCtx.isNear();
+            ClusterNode primary = nodes.get(0);
 
-            IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
+            if (cacheCtx.isNear())
+                hasNearCache = true;
 
-            GridDistributedTxMapping nodeMapping = mappings.get(key);
+            GridDistributedTxMapping nodeMapping = mappings.get(primary.id());
 
             if (nodeMapping == null) {
                 nodeMapping = new GridDistributedTxMapping(primary);
 
-                nodeMapping.near(cacheCtx.isNear());
-
-                mappings.put(key, nodeMapping);
+                mappings.put(primary.id(), nodeMapping);
             }
 
             txEntry.nodeId(primary.id());
@@ -230,7 +231,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         tx.transactionNodes(txMapping.transactionNodes());
 
-        checkOnePhase(txMapping);
+        if (!hasNearCache)
+            checkOnePhase(txMapping);
 
         long timeout = tx.remainingTime();
 
@@ -252,7 +254,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 timeout,
                 m.reads(),
                 m.writes(),
-                m.near(),
+                m.hasNearCacheEntries(),
                 txMapping.transactionNodes(),
                 true,
                 tx.onePhaseCommit(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 37be0fb..89874ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -642,7 +642,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
      * @param mapping Mapping to finish.
      */
     private void readyNearMappingFromBackup(GridDistributedTxMapping mapping) {
-        if (mapping.near()) {
+        if (mapping.hasNearCacheEntries()) {
             GridCacheVersion xidVer = tx.xidVersion();
 
             mapping.dhtVersion(xidVer, xidVer);
@@ -676,7 +676,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
     private void finish(int miniId, GridDistributedTxMapping m, boolean commit) {
         ClusterNode n = m.primary();
 
-        assert !m.empty();
+        assert !m.empty() : m;
 
         CacheWriteSynchronizationMode syncMode = tx.syncMode();
 
@@ -698,7 +698,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             m.explicitLock(),
             tx.storeEnabled(),
             tx.topologyVersion(),
-            completedVer, // Reuse 'baseVersion'  to do not add new fields in message.
+            completedVer, // Reuse 'baseVersion' to do not add new fields in message.
             null,
             null,
             tx.size(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 62af536..5f60724 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -2862,8 +2863,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                 if (m == null) {
                     mappings.put(m = new GridDistributedTxMapping(primary));
 
-                    m.near(map.near());
-
                     if (map.explicitLock())
                         m.markExplicitLock();
                 }
@@ -2889,8 +2888,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
 
         mappings.put(m);
 
-        m.near(map.near());
-
         if (map.explicitLock())
             m.markExplicitLock();
 
@@ -2933,13 +2930,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers)
     {
+        List<IgniteTxEntry> nearEntries = mapping.nearCacheEntries();
+
+        assert nearEntries != null;
+
         // Process writes, then reads.
-        for (IgniteTxEntry txEntry : mapping.entries()) {
+        for (IgniteTxEntry txEntry : nearEntries) {
             if (CU.writes().apply(txEntry))
                 readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
         }
 
-        for (IgniteTxEntry txEntry : mapping.entries()) {
+        for (IgniteTxEntry txEntry : nearEntries) {
             if (CU.reads().apply(txEntry))
                 readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
         }
@@ -2952,7 +2953,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
      */
-    void readyNearLock(IgniteTxEntry txEntry,
+    private void readyNearLock(IgniteTxEntry txEntry,
         GridCacheVersion dhtVer,
         Collection<GridCacheVersion> pendingVers,
         Collection<GridCacheVersion> committedVers,

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 7f1f5a2..f35324a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -172,7 +172,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
 
             Collection<UUID> backups = entry.getValue();
 
-            if (backups.size() <= 1)
+            if (backups.size() <= 1 && !tx.txState().hasNearCacheConfigured(cctx, tx.topologyVersion()))
                 tx.onePhaseCommit(true);
         }
     }
@@ -261,7 +261,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
             if (map != null)
                 map.dhtVersion(res.dhtVersion(), writeVer);
 
-            if (m.near())
+            if (m.hasNearCacheEntries())
                 tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 5743bfb..36f5f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -293,6 +294,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
+        return cacheCtx != null ? ctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) : false;
+    }
+
+    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index a59ff51..0490e6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -225,7 +225,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteTxState txState() {
+    @Override public IgniteTxLocalState txState() {
         return txState;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index 123d396..fe9fcbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+
 /**
  *
  */
@@ -41,4 +44,11 @@ public interface IgniteTxLocalState extends IgniteTxState {
      *
      */
     public void seal();
+
+    /**
+     * @param ctx Context.
+     * @param topVer Topology version.
+     * @return {@code True} if tx has cache with created near cache.
+     */
+    public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 7a45b6e..ed2526e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -85,7 +85,7 @@ public interface IgniteTxState {
     public boolean hasNearCache(GridCacheSharedContext cctx);
 
     /**
-     * @param cacheCtx Ccntext.
+     * @param cacheCtx Context.
      * @param tx Transaction.
      * @throws IgniteCheckedException If cache check failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 304473e..c21eb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -28,6 +28,8 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -474,6 +476,22 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
+        DiscoCache discoCache = ctx.discovery().discoCache(topVer);
+
+        assert discoCache != null : topVer;
+
+        for (int i = 0; i < activeCacheIds.size(); i++) {
+            int cacheId = (int) activeCacheIds.get(i);
+
+            if (discoCache.hasNearCache(cacheId))
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ead80c01/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
index bfbafdb..273f0ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCrossCacheTxSelfTest.java
@@ -70,7 +70,7 @@ public class IgniteCrossCacheTxSelfTest extends GridCommonAbstractTest {
      * @return Node count for this test.
      */
     private int nodeCount() {
-        return 4;
+        return 2;
     }
 
     /**