You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/19 14:21:15 UTC

[1/8] incubator-ignite git commit: IGNITE-1265 - Properly handle invalid partitions in DHT prepare response.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1.3.3-p3 ac670f923 -> 7d1a550dc


IGNITE-1265 - Properly handle invalid partitions in DHT prepare response.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7a43dde7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7a43dde7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7a43dde7

Branch: refs/heads/ignite-1.3.3-p3
Commit: 7a43dde77b47478e6b02bbab9d81ad70a2299c51
Parents: 5faffb9
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 10:35:59 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 10:35:59 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  1 -
 .../processors/cache/GridCacheUtils.java        | 20 +++++++++++++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 23 +++++++++++++++++++-
 .../dht/GridDhtTxPrepareResponse.java           | 17 +++++++++++++++
 .../near/GridNearTxPrepareResponse.java         |  3 ---
 .../cache/transactions/IgniteInternalTx.java    |  2 +-
 .../cache/transactions/IgniteTxAdapter.java     | 19 +++++++++++-----
 .../cache/transactions/IgniteTxHandler.java     |  5 ++---
 .../ignite/internal/util/lang/GridFunc.java     | 14 ++++++++++++
 9 files changed, 90 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 29e3551..c128aa6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -493,7 +493,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     req.version(),
                     null,
                     null,
-                    null,
                     null);
 
                 res.error(req.classError());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 41e3896..1e3cd67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1681,6 +1681,26 @@ public class GridCacheUtils {
     }
 
     /**
+     * @param partsMap Cache ID to partition IDs collection map.
+     * @return Cache ID to partition ID array map.
+     */
+    public static Map<Integer, Integer[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
+        Map<Integer, Integer[]> res = new HashMap<>(partsMap.size());
+
+        for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
+            Set<Integer> parts = entry.getValue();
+
+            Integer[] partsArray = new Integer[parts.size()];
+
+            partsArray = parts.toArray(partsArray);
+
+            res.put(entry.getKey(), partsArray);
+        }
+
+        return res;
+    }
+
+    /**
      * Stops store session listeners.
      *
      * @param ctx Kernal context.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index af0fbdf..27de8cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -574,13 +574,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         // Send reply back to originating near node.
         Throwable prepErr = err.get();
 
+        assert F.isEmpty(tx.invalidPartitions());
+
         GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
             tx.nearXidVersion(),
             tx.colocated() ? tx.xid() : tx.nearFutureId(),
             nearMiniId == null ? tx.xid() : nearMiniId,
             tx.xidVersion(),
             tx.writeVersion(),
-            tx.invalidPartitions(),
             ret,
             prepErr,
             null);
@@ -1194,6 +1195,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
 
                 // Process invalid partitions (no need to remap).
+                // Keep this loop for backward compatibility.
                 if (!F.isEmpty(res.invalidPartitions())) {
                     for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
                         IgniteTxEntry entry  = it.next();
@@ -1206,6 +1208,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                     ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
                         }
                     }
+                }
+
+                // Process invalid partitions (no need to remap).
+                if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
+                    Map<Integer, Integer[]> invalidPartsMap = res.invalidPartitionsByCacheId();
+
+                    for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
+                        IgniteTxEntry entry  = it.next();
+
+                        Integer[] invalidParts = invalidPartsMap.get(entry.cacheId());
+
+                        if (F.contains(invalidParts, entry.cached().partition())) {
+                            it.remove();
+
+                            if (log.isDebugEnabled())
+                                log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() +
+                                    ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
+                        }
+                    }
 
                     if (dhtMapping.empty()) {
                         dhtMap.remove(nodeId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 753c117..cc85628 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -55,6 +55,9 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     @GridDirectCollection(int.class)
     private Collection<Integer> invalidParts;
 
+    /** Invalid partitions by cache ID. */
+    private Map<Integer, Integer[]> invalidPartsByCacheId;
+
     /** Preload entries. */
     @GridDirectCollection(GridCacheEntryInfo.class)
     private List<GridCacheEntryInfo> preloadEntries;
@@ -140,6 +143,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     }
 
     /**
+     * @return Map from cacheId to an array of invalid partitions.
+     */
+    public Map<Integer, Integer[]> invalidPartitionsByCacheId() {
+        return invalidPartsByCacheId;
+    }
+
+    /**
+     * @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions.
+     */
+    public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) {
+        this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId);
+    }
+
+    /**
      * Gets preload entries found on backup node.
      *
      * @return Collection of entry infos need to be preloaded.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index b418500..b24c62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -101,7 +101,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
      * @param miniId Mini future ID.
      * @param dhtVer DHT version.
      * @param writeVer Write version.
-     * @param invalidParts Invalid partitions.
      * @param retVal Return value.
      * @param err Error.
      * @param clientRemapVer Not {@code null} if client node should remap transaction.
@@ -112,7 +111,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         IgniteUuid miniId,
         GridCacheVersion dhtVer,
         GridCacheVersion writeVer,
-        Collection<Integer> invalidParts,
         GridCacheReturn retVal,
         Throwable err,
         AffinityTopologyVersion clientRemapVer
@@ -127,7 +125,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         this.miniId = miniId;
         this.dhtVer = dhtVer;
         this.writeVer = writeVer;
-        this.invalidParts = invalidParts;
         this.retVal = retVal;
         this.clientRemapVer = clientRemapVer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index b16e950..f2f20dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -302,7 +302,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     /**
      * @return Invalid partitions.
      */
-    public Set<Integer> invalidPartitions();
+    public Map<Integer, Set<Integer>> invalidPartitions();
 
     /**
      * Gets owned version for near remote transaction.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 709c208..4fc6f0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -162,7 +162,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     private AtomicBoolean preparing = new AtomicBoolean();
 
     /** */
-    private Set<Integer> invalidParts = new GridLeanSet<>();
+    private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
 
     /**
      * Transaction state. Note that state is not protected, as we want to
@@ -671,16 +671,25 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public Set<Integer> invalidPartitions() {
+    @Override public Map<Integer, Set<Integer>> invalidPartitions() {
         return invalidParts;
     }
 
     /** {@inheritDoc} */
     @Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) {
-        invalidParts.add(part);
+        Set<Integer> parts = invalidParts.get(cacheCtx.cacheId());
+
+        if (parts == null) {
+            parts = new GridLeanSet<>();
+
+            invalidParts.put(cacheCtx.cacheId(), parts);
+        }
+
+        parts.add(part);
 
         if (log.isDebugEnabled())
-            log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']');
+            log.debug("Added invalid partition for transaction [cache=" + cacheCtx.name() + ", part=" + part +
+                ", tx=" + this + ']');
     }
 
     /** {@inheritDoc} */
@@ -1765,7 +1774,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Override public Set<Integer> invalidPartitions() {
+        @Override public Map<Integer, Set<Integer>> invalidPartitions() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e481e25..227cb34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -290,7 +290,6 @@ public class IgniteTxHandler {
                         req.version(),
                         null,
                         null,
-                        null,
                         top.topologyVersion());
 
                     try {
@@ -803,7 +802,7 @@ public class IgniteTxHandler {
                 res.nearEvicted(nearTx.evicted());
 
             if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
-                res.invalidPartitions(dhtTx.invalidPartitions());
+                res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
 
             if (req.onePhaseCommit()) {
                 assert req.last();
@@ -1154,7 +1153,7 @@ public class IgniteTxHandler {
             if (req.last())
                 tx.state(PREPARED);
 
-            res.invalidPartitions(tx.invalidPartitions());
+            res.invalidPartitionsByCacheId(tx.invalidPartitions());
 
             if (tx.empty() && req.last()) {
                 tx.rollback();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..8a354ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -4083,6 +4083,20 @@ public class GridFunc {
      * @param val Value to find.
      * @return {@code True} if array contains given value.
      */
+    public static boolean contains(Integer[] arr, int val) {
+        for (Integer el : arr) {
+            if (el == val)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param arr Array.
+     * @param val Value to find.
+     * @return {@code True} if array contains given value.
+     */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public static boolean contains(long[] arr, long val) {
         for (int i = 0; i < arr.length; i++) {


[2/8] incubator-ignite git commit: IGNITE-1265 - Rebuilt messages.

Posted by sb...@apache.org.
IGNITE-1265 - Rebuilt messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/574c6793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/574c6793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/574c6793

Branch: refs/heads/ignite-1.3.3-p3
Commit: 574c6793adb96caa614b9c0c540f4812cfc52ee5
Parents: 7a43dde
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 11:11:43 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 11:11:43 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheUtils.java        | 11 +++++---
 .../distributed/dht/GridDhtTxPrepareFuture.java |  6 ++--
 .../dht/GridDhtTxPrepareResponse.java           | 29 +++++++++++++++-----
 3 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574c6793/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 1e3cd67..80e0d69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1684,15 +1684,18 @@ public class GridCacheUtils {
      * @param partsMap Cache ID to partition IDs collection map.
      * @return Cache ID to partition ID array map.
      */
-    public static Map<Integer, Integer[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
-        Map<Integer, Integer[]> res = new HashMap<>(partsMap.size());
+    public static Map<Integer, int[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
+        Map<Integer, int[]> res = new HashMap<>(partsMap.size());
 
         for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
             Set<Integer> parts = entry.getValue();
 
-            Integer[] partsArray = new Integer[parts.size()];
+            int[] partsArray = new int[parts.size()];
 
-            partsArray = parts.toArray(partsArray);
+            int idx = 0;
+
+            for (Integer part : parts)
+                partsArray[idx++] = part;
 
             res.put(entry.getKey(), partsArray);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574c6793/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 27de8cf..2b7e1bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1212,14 +1212,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                 // Process invalid partitions (no need to remap).
                 if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
-                    Map<Integer, Integer[]> invalidPartsMap = res.invalidPartitionsByCacheId();
+                    Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId();
 
                     for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
                         IgniteTxEntry entry  = it.next();
 
-                        Integer[] invalidParts = invalidPartsMap.get(entry.cacheId());
+                        int[] invalidParts = invalidPartsMap.get(entry.cacheId());
 
-                        if (F.contains(invalidParts, entry.cached().partition())) {
+                        if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) {
                             it.remove();
 
                             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574c6793/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index cc85628..bcf7f8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -56,7 +56,8 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     private Collection<Integer> invalidParts;
 
     /** Invalid partitions by cache ID. */
-    private Map<Integer, Integer[]> invalidPartsByCacheId;
+    @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+    private Map<Integer, int[]> invalidPartsByCacheId;
 
     /** Preload entries. */
     @GridDirectCollection(GridCacheEntryInfo.class)
@@ -145,7 +146,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @return Map from cacheId to an array of invalid partitions.
      */
-    public Map<Integer, Integer[]> invalidPartitionsByCacheId() {
+    public Map<Integer, int[]> invalidPartitionsByCacheId() {
         return invalidPartsByCacheId;
     }
 
@@ -255,18 +256,24 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
                     return false;
 
@@ -305,7 +312,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 10:
-                miniId = reader.readIgniteUuid("miniId");
+                invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -313,7 +320,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 11:
-                nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -321,6 +328,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 12:
+                nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -340,6 +355,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 }


[3/8] incubator-ignite git commit: IGNITE-1265 - EntryProcessorTest when nodes joining topology.

Posted by sb...@apache.org.
IGNITE-1265 - EntryProcessorTest when nodes joining topology.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/013d7075
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/013d7075
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/013d7075

Branch: refs/heads/ignite-1.3.3-p3
Commit: 013d7075853d4728739f3dfb2647ce6001d723c8
Parents: 574c679
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 18:18:43 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 18:18:43 2015 -0700

----------------------------------------------------------------------
 .../IgniteCacheEntryProcessorRestartTest.java   | 185 +++++++++++++++++++
 1 file changed, 185 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/013d7075/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
new file mode 100644
index 0000000..c027ee4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests cache in-place modification logic with iterative value increment.
+ */
+public class IgniteCacheEntryProcessorRestartTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Number of nodes to test on. */
+    private static final int GRID_CNT = 2;
+
+    /** Number of increment iterations. */
+    private static final int NUM_SETS = 50;
+
+    /** Helper for excluding stopped node from iteration logic. */
+    private AtomicReferenceArray<Ignite> grids;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cache = new CacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setAtomicityMode(TRANSACTIONAL);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setBackups(1);
+        cache.setRebalanceMode(SYNC);
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(GRID_CNT);
+
+        grids = new AtomicReferenceArray<>(GRID_CNT);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grids.set(i, grid(i));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        grids = null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntryProcessorRestart() throws Exception {
+        final AtomicBoolean stop = new AtomicBoolean();
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final int started = 6;
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    for (int i = 0; i < started; i++) {
+                        U.sleep(1_000);
+
+                        startGrid(GRID_CNT + i);
+                    }
+                }
+                catch (Exception e) {
+                    error.compareAndSet(null, e);
+                }
+            }
+        }, 1, "starter");
+
+        try {
+            checkIncrement();
+        }
+        finally {
+            stop.set(true);
+
+            fut.get(getTestTimeout());
+        }
+
+        for (int i = 0; i < NUM_SETS; i++) {
+            for (int g = 0; g < GRID_CNT + started; g++) {
+                Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+
+                assertNotNull(vals);
+                assertEquals(100, vals.size());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkIncrement() throws Exception {
+        for (int k = 0; k < 100; k++) {
+            for (int i = 0; i < NUM_SETS; i++) {
+                String key = "set-" + i;
+
+                String val = "value-" + k;
+
+                IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+                cache.invoke(key, new Processor(val));
+            }
+        }
+    }
+
+    /** */
+    private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+        /** */
+        private String val;
+
+        private Processor(String val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+            Set<String> vals = e.getValue();
+
+            if (vals == null)
+                vals = new HashSet<>();
+
+            vals.add(val);
+
+            e.setValue(vals);
+
+            return null;
+        }
+    }
+}


[5/8] incubator-ignite git commit: IGNITE-1265 - Entry processor must always have the correct cache value.

Posted by sb...@apache.org.
IGNITE-1265 - Entry processor must always have the correct cache value.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5065a1ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5065a1ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5065a1ec

Branch: refs/heads/ignite-1.3.3-p3
Commit: 5065a1eccb3d71b2573d37bb6ff2c78a1bbc107c
Parents: ccaa2b2
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 19:35:50 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 19:35:50 2015 -0700

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |  20 +++
 .../dht/GridDhtPartitionTopology.java           |   7 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  20 +++
 .../cache/distributed/dht/GridDhtTxLocal.java   |   4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 136 +++++++++++++++++--
 .../cache/transactions/IgniteTxEntry.java       |  18 +++
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |  54 ++++----
 .../cache/IgniteCacheInvokeReadThroughTest.java |   2 +-
 8 files changed, 223 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c3f3e7f..531678e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         lock.readLock().lock();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..7b08510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology {
     public GridDhtPartitionMap localPartitionMap();
 
     /**
+     * @param nodeId Node ID.
+     * @param part Partition.
+     * @return Partition state.
+     */
+    public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+    /**
      * @return Current update sequence.
      */
     public long updateSequence();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index de7f876..f356138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -614,6 +614,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6a72c89..7da6e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      * @return Future that will be completed when locks are acquired.
      */
     public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
-        @Nullable Iterable<IgniteTxEntry> reads,
-        @Nullable Iterable<IgniteTxEntry> writes,
+        @Nullable Collection<IgniteTxEntry> reads,
+        @Nullable Collection<IgniteTxEntry> writes,
         Map<IgniteTxKey, GridCacheVersion> verMap,
         long msgId,
         IgniteUuid nearMiniId,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2b7e1bc..ad1023f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     /** Keys that should be locked. */
     private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
 
+    /** Force keys future for correct transforms. */
+    private IgniteInternalFuture<?> forceKeysFut;
+
     /** Locks ready flag. */
     private volatile boolean locksReady;
 
@@ -291,7 +294,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                 boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
 
-                if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+                if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
                     cached.unswap(retVal);
 
                     boolean readThrough = (retVal || hasFilters) &&
@@ -312,7 +315,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         null,
                         null);
 
-                    if (retVal) {
+                    if (retVal || txEntry.op() == TRANSFORM) {
                         if (!F.isEmpty(txEntry.entryProcessors())) {
                             invoke = true;
 
@@ -339,6 +342,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                 }
                             }
 
+                            txEntry.entryProcessorCalculatedValue(val);
+
                             if (err != null || procRes != null)
                                 ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
                             else
@@ -362,7 +367,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         ret.success(false);
                     }
                     else
-                        ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue());
+                        ret.success(txEntry.op() != DELETE || cached.hasValue());
                 }
             }
             catch (IgniteCheckedException e) {
@@ -466,7 +471,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      */
     private boolean mapIfLocked() {
         if (checkLocks()) {
-            prepare0();
+            if (!mapped.compareAndSet(false, true))
+                return false;
+
+            if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null))
+                prepare0();
+            else {
+                forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        try {
+                            f.get();
+
+                            prepare0();
+                        }
+                        catch (IgniteCheckedException e) {
+                            onError(e);
+                        }
+                    }
+                });
+            }
 
             return true;
         }
@@ -709,7 +732,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param writes Write entries.
      * @param txNodes Transaction nodes mapping.
      */
-    public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes,
+    @SuppressWarnings("TypeMayBeWeakened")
+    public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
         Map<UUID, Collection<UUID>> txNodes) {
         if (tx.empty()) {
             tx.setRollbackOnly();
@@ -721,6 +745,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         this.writes = writes;
         this.txNodes = txNodes;
 
+        if (!F.isEmpty(writes)) {
+            Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
+
+            for (IgniteTxEntry entry : writes)
+                forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+
+            forceKeysFut = forceRebalanceKeys(forceKeys);
+        }
+
         readyLocks();
 
         mapIfLocked();
@@ -735,12 +768,75 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
+     * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store
+     * required key or will create new map if passed in map is {@code null}.
      *
+     * @param e TX entry.
+     * @param map Map with needed preload keys.
+     * @return Map if it was created.
      */
-    private void prepare0() {
-        if (!mapped.compareAndSet(false, true))
-            return;
+    private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys(
+        IgniteTxEntry e,
+        Map<Integer, Collection<KeyCacheObject>> map
+    ) {
+        if (retVal || !F.isEmpty(e.entryProcessors())) {
+            if (map == null)
+                map = new HashMap<>();
+
+            Collection<KeyCacheObject> keys = map.get(e.cacheId());
+
+            if (keys == null) {
+                keys = new ArrayList<>();
+
+                map.put(e.cacheId(), keys);
+            }
+
+            keys.add(e.key());
+        }
+
+        return map;
+    }
+
+    private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
+        if (F.isEmpty(keysMap))
+            return null;
+
+        GridCompoundFuture<Object, Object> compFut = null;
+        IgniteInternalFuture<Object> lastForceFut = null;
 
+        for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) {
+            if (lastForceFut != null && compFut == null) {
+                compFut = new GridCompoundFuture();
+
+                compFut.add(lastForceFut);
+            }
+
+            int cacheId = entry.getKey();
+
+            Collection<KeyCacheObject> keys = entry.getValue();
+
+            lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+
+            if (compFut != null)
+                compFut.add(lastForceFut);
+        }
+
+        if (compFut != null) {
+            compFut.markInitialized();
+
+            return compFut;
+        }
+        else {
+            assert lastForceFut != null;
+
+            return lastForceFut;
+        }
+    }
+
+    /**
+     *
+     */
+    private void prepare0() {
         try {
             // We are holding transaction-level locks for entries here, so we can get next write version.
             onEntriesLocked();
@@ -957,7 +1053,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     private boolean map(
         IgniteTxEntry entry,
         Map<UUID, GridDistributedTxMapping> futDhtMap,
-        Map<UUID, GridDistributedTxMapping> futNearMap) {
+        Map<UUID, GridDistributedTxMapping> futNearMap
+    ) {
         if (entry.cached().isLocal())
             return false;
 
@@ -1024,14 +1121,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param locMap Exclude map.
      * @return {@code True} if mapped.
      */
-    private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
-        Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
+    private boolean map(
+        IgniteTxEntry entry,
+        Iterable<ClusterNode> nodes,
+        Map<UUID, GridDistributedTxMapping> globalMap,
+        Map<UUID, GridDistributedTxMapping> locMap
+    ) {
         boolean ret = false;
 
         if (nodes != null) {
             for (ClusterNode n : nodes) {
                 GridDistributedTxMapping global = globalMap.get(n.id());
 
+                if (!F.isEmpty(entry.entryProcessors())) {
+                    GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+                        entry.cached().partition());
+
+                    if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+                        CacheObject procVal = entry.entryProcessorCalculatedValue();
+
+                        entry.op(procVal == null ? DELETE : UPDATE);
+                        entry.value(procVal, true, false);
+                        entry.entryProcessors(null);
+                    }
+                }
+
                 if (global == null)
                     globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 247d350..7890831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     @GridDirectTransient
     private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
 
+    /** Transient field for calculated entry processor value. */
+    @GridDirectTransient
+    private CacheObject entryProcessorCalcVal;
+
     /** Transform closure bytes. */
     @GridToStringExclude
     private byte[] transformClosBytes;
@@ -775,6 +779,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         return expiryPlc;
     }
 
+    /**
+     * @return Entry processor calculated value.
+     */
+    public CacheObject entryProcessorCalculatedValue() {
+        return entryProcessorCalcVal;
+    }
+
+    /**
+     * @param entryProcessorCalcVal Entry processor calculated value.
+     */
+    public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) {
+        this.entryProcessorCalcVal = entryProcessorCalcVal;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 9c17ebd..94bfd8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -122,38 +122,44 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         final AtomicReference<Throwable> error = new AtomicReference<>();
         final int started = 6;
 
-        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    for (int i = 0; i < started; i++) {
-                        U.sleep(1_000);
-
-                        startGrid(GRID_CNT + i);
+        try {
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        for (int i = 0; i < started; i++) {
+                            U.sleep(1_000);
+
+                            startGrid(GRID_CNT + i);
+                        }
+                    }
+                    catch (Exception e) {
+                        error.compareAndSet(null, e);
                     }
                 }
-                catch (Exception e) {
-                    error.compareAndSet(null, e);
-                }
-            }
-        }, 1, "starter");
+            }, 1, "starter");
 
-        try {
-            checkIncrement(invokeAll);
-        }
-        finally {
-            stop.set(true);
+            try {
+                checkIncrement(invokeAll);
+            }
+            finally {
+                stop.set(true);
 
-            fut.get(getTestTimeout());
-        }
+                fut.get(getTestTimeout());
+            }
 
-        for (int i = 0; i < NUM_SETS; i++) {
-            for (int g = 0; g < GRID_CNT + started; g++) {
-                Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+            for (int i = 0; i < NUM_SETS; i++) {
+                for (int g = 0; g < GRID_CNT + started; g++) {
+                    Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
 
-                assertNotNull(vals);
-                assertEquals(100, vals.size());
+                    assertNotNull(vals);
+                    assertEquals(100, vals.size());
+                }
             }
         }
+        finally {
+            for (int i = 0; i < started; i++)
+                stopGrid(GRID_CNT + i);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 10ab1ab..b72540d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*;
 public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-114");
+//        fail("https://issues.apache.org/jira/browse/IGNITE-114");
     }
 
     /** */


[8/8] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-1265' into ignite-1.3.3-p3

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1265' into ignite-1.3.3-p3


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7d1a550d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7d1a550d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7d1a550d

Branch: refs/heads/ignite-1.3.3-p3
Commit: 7d1a550dcac96dc2028aaeab5632c9d35f99489b
Parents: ac670f9 b55365d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 19 15:20:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 19 15:20:13 2015 +0300

----------------------------------------------------------------------
 .../CachePartialUpdateCheckedException.java     |  11 +-
 .../processors/cache/GridCacheIoManager.java    |   1 -
 .../processors/cache/GridCacheUtils.java        |  23 ++
 .../distributed/GridDistributedCacheEntry.java  |   7 +
 .../dht/GridClientPartitionTopology.java        |  20 ++
 .../dht/GridDhtPartitionTopology.java           |   7 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  20 ++
 .../cache/distributed/dht/GridDhtTxLocal.java   |   4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 166 ++++++++++++--
 .../dht/GridDhtTxPrepareResponse.java           |  42 +++-
 .../distributed/near/GridNearCacheEntry.java    |   6 +
 .../distributed/near/GridNearLockFuture.java    |   4 +-
 .../near/GridNearTxPrepareResponse.java         |   3 -
 .../cache/transactions/IgniteInternalTx.java    |   2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  19 +-
 .../cache/transactions/IgniteTxEntry.java       |  18 ++
 .../cache/transactions/IgniteTxHandler.java     |   5 +-
 .../ignite/internal/util/lang/GridFunc.java     |  14 ++
 ...teAtomicCacheEntryProcessorNodeJoinTest.java |  32 +++
 .../IgniteCacheEntryProcessorNodeJoinTest.java  | 225 +++++++++++++++++++
 .../near/GridCacheNearOnlyTopologySelfTest.java |   4 +-
 .../near/GridCacheNearTxForceKeyTest.java       |  76 +++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   4 +
 23 files changed, 670 insertions(+), 43 deletions(-)
----------------------------------------------------------------------



[6/8] incubator-ignite git commit: # ignite-1265

Posted by sb...@apache.org.
# ignite-1265


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8b3fed85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8b3fed85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8b3fed85

Branch: refs/heads/ignite-1.3.3-p3
Commit: 8b3fed850ccc4527a6593c5ec4e596ca6c08b61f
Parents: 5065a1e
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 19 09:32:54 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 19 10:26:10 2015 +0300

----------------------------------------------------------------------
 .../CachePartialUpdateCheckedException.java     | 11 ++++---
 ...teAtomicCacheEntryProcessorNodeJoinTest.java | 32 ++++++++++++++++++++
 .../IgniteCacheEntryProcessorNodeJoinTest.java  | 25 ++++++++-------
 .../testsuites/IgniteCacheTestSuite2.java       |  3 ++
 4 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
index c2259df..fc846f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
@@ -47,8 +47,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
      * Gets collection of failed keys.
      * @return Collection of failed keys.
      */
-    public <K> Collection<K> failedKeys() {
-        return (Collection<K>)failedKeys;
+    @SuppressWarnings("unchecked")
+    public synchronized <K> Collection<K> failedKeys() {
+        return new HashSet<>((Collection<K>)failedKeys);
     }
 
     /**
@@ -56,7 +57,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
      * @param err Error.
      * @param topVer Topology version for failed update.
      */
-    public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
+    public synchronized void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
         if (topVer != null) {
             AffinityTopologyVersion topVer0 = this.topVer;
 
@@ -72,7 +73,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
     /**
      * @return Topology version.
      */
-    public AffinityTopologyVersion topologyVersion() {
+    public synchronized AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -80,7 +81,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
      * @param failedKeys Failed keys.
      * @param err Error.
      */
-    public void add(Collection<?> failedKeys, Throwable err) {
+    public synchronized void add(Collection<?> failedKeys, Throwable err) {
         add(failedKeys, err, null);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
new file mode 100644
index 0000000..af87a7d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class IgniteAtomicCacheEntryProcessorNodeJoinTest extends IgniteCacheEntryProcessorNodeJoinTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 94bfd8f..955a792 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -51,9 +52,6 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
     /** Number of increment iterations. */
     private static final int NUM_SETS = 50;
 
-    /** Helper for excluding stopped node from iteration logic. */
-    private AtomicReferenceArray<Ignite> grids;
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -61,7 +59,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         CacheConfiguration cache = new CacheConfiguration();
 
         cache.setCacheMode(PARTITIONED);
-        cache.setAtomicityMode(TRANSACTIONAL);
+        cache.setAtomicityMode(atomicityMode());
         cache.setWriteSynchronizationMode(FULL_SYNC);
         cache.setBackups(1);
         cache.setRebalanceMode(SYNC);
@@ -83,21 +81,21 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         return cfg;
     }
 
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         startGrids(GRID_CNT);
-
-        grids = new AtomicReferenceArray<>(GRID_CNT);
-
-        for (int i = 0; i < GRID_CNT; i++)
-            grids.set(i, grid(i));
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
-
-        grids = null;
     }
 
     /**
@@ -115,6 +113,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
     }
 
     /**
+     * @param invokeAll If {@code true} tests invokeAll operation.
      * @throws Exception If failed.
      */
     private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception {
@@ -163,6 +162,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
     }
 
     /**
+     * @param invokeAll If {@code true} tests invokeAll operation.
      * @throws Exception If failed.
      */
     private void checkIncrement(boolean invokeAll) throws Exception {
@@ -201,6 +201,9 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         /** */
         private String val;
 
+        /**
+         * @param val Value.
+         */
         private Processor(String val) {
             this.val = val;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dcbab07..ec50399 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -140,6 +140,9 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(IgniteCacheClientNodeChangingTopologyTest.class));
         suite.addTest(new TestSuite(IgniteCacheClientNodeConcurrentStart.class));
 
+        suite.addTest(new TestSuite(IgniteCacheEntryProcessorNodeJoinTest.class));
+        suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class));
+
         return suite;
     }
 }


[4/8] incubator-ignite git commit: IGNITE-1265 - EntryProcessorTest when nodes joining topology.

Posted by sb...@apache.org.
IGNITE-1265 - EntryProcessorTest when nodes joining topology.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ccaa2b20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ccaa2b20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ccaa2b20

Branch: refs/heads/ignite-1.3.3-p3
Commit: ccaa2b20dab5438603471796b7155f309261a41f
Parents: 013d707
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 18:38:36 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 18:38:36 2015 -0700

----------------------------------------------------------------------
 .../IgniteCacheEntryProcessorNodeJoinTest.java  | 216 +++++++++++++++++++
 .../IgniteCacheEntryProcessorRestartTest.java   | 185 ----------------
 2 files changed, 216 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ccaa2b20/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
new file mode 100644
index 0000000..9c17ebd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests cache in-place modification logic with iterative value increment.
+ */
+public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Number of nodes to test on. */
+    private static final int GRID_CNT = 2;
+
+    /** Number of increment iterations. */
+    private static final int NUM_SETS = 50;
+
+    /** Helper for excluding stopped node from iteration logic. */
+    private AtomicReferenceArray<Ignite> grids;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cache = new CacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setAtomicityMode(TRANSACTIONAL);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setBackups(1);
+        cache.setRebalanceMode(SYNC);
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(GRID_CNT);
+
+        grids = new AtomicReferenceArray<>(GRID_CNT);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grids.set(i, grid(i));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        grids = null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleEntryProcessorNodeJoin() throws Exception {
+        checkEntryProcessorNodeJoin(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAllEntryProcessorNodeJoin() throws Exception {
+        checkEntryProcessorNodeJoin(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception {
+        final AtomicBoolean stop = new AtomicBoolean();
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final int started = 6;
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    for (int i = 0; i < started; i++) {
+                        U.sleep(1_000);
+
+                        startGrid(GRID_CNT + i);
+                    }
+                }
+                catch (Exception e) {
+                    error.compareAndSet(null, e);
+                }
+            }
+        }, 1, "starter");
+
+        try {
+            checkIncrement(invokeAll);
+        }
+        finally {
+            stop.set(true);
+
+            fut.get(getTestTimeout());
+        }
+
+        for (int i = 0; i < NUM_SETS; i++) {
+            for (int g = 0; g < GRID_CNT + started; g++) {
+                Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+
+                assertNotNull(vals);
+                assertEquals(100, vals.size());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkIncrement(boolean invokeAll) throws Exception {
+        for (int k = 0; k < 100; k++) {
+            if (invokeAll) {
+                IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+                Map<String, Processor> procs = new LinkedHashMap<>();
+
+                for (int i = 0; i < NUM_SETS; i++) {
+                    String key = "set-" + i;
+
+                    String val = "value-" + k;
+
+                    cache.invoke(key, new Processor(val));
+                }
+
+                cache.invokeAll(procs);
+            }
+            else {
+                for (int i = 0; i < NUM_SETS; i++) {
+                    String key = "set-" + i;
+
+                    String val = "value-" + k;
+
+                    IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+                    cache.invoke(key, new Processor(val));
+                }
+            }
+        }
+    }
+
+    /** */
+    private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+        /** */
+        private String val;
+
+        private Processor(String val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+            Set<String> vals = e.getValue();
+
+            if (vals == null)
+                vals = new HashSet<>();
+
+            vals.add(val);
+
+            e.setValue(vals);
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ccaa2b20/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
deleted file mode 100644
index c027ee4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import javax.cache.processor.*;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheRebalanceMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Tests cache in-place modification logic with iterative value increment.
- */
-public class IgniteCacheEntryProcessorRestartTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of nodes to test on. */
-    private static final int GRID_CNT = 2;
-
-    /** Number of increment iterations. */
-    private static final int NUM_SETS = 50;
-
-    /** Helper for excluding stopped node from iteration logic. */
-    private AtomicReferenceArray<Ignite> grids;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheConfiguration cache = new CacheConfiguration();
-
-        cache.setCacheMode(PARTITIONED);
-        cache.setAtomicityMode(TRANSACTIONAL);
-        cache.setWriteSynchronizationMode(FULL_SYNC);
-        cache.setBackups(1);
-        cache.setRebalanceMode(SYNC);
-
-        cfg.setCacheConfiguration(cache);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(IP_FINDER);
-
-        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
-        commSpi.setSharedMemoryPort(-1);
-
-        cfg.setCommunicationSpi(commSpi);
-
-        cfg.setDiscoverySpi(disco);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        startGrids(GRID_CNT);
-
-        grids = new AtomicReferenceArray<>(GRID_CNT);
-
-        for (int i = 0; i < GRID_CNT; i++)
-            grids.set(i, grid(i));
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        grids = null;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testEntryProcessorRestart() throws Exception {
-        final AtomicBoolean stop = new AtomicBoolean();
-        final AtomicReference<Throwable> error = new AtomicReference<>();
-        final int started = 6;
-
-        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    for (int i = 0; i < started; i++) {
-                        U.sleep(1_000);
-
-                        startGrid(GRID_CNT + i);
-                    }
-                }
-                catch (Exception e) {
-                    error.compareAndSet(null, e);
-                }
-            }
-        }, 1, "starter");
-
-        try {
-            checkIncrement();
-        }
-        finally {
-            stop.set(true);
-
-            fut.get(getTestTimeout());
-        }
-
-        for (int i = 0; i < NUM_SETS; i++) {
-            for (int g = 0; g < GRID_CNT + started; g++) {
-                Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
-
-                assertNotNull(vals);
-                assertEquals(100, vals.size());
-            }
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkIncrement() throws Exception {
-        for (int k = 0; k < 100; k++) {
-            for (int i = 0; i < NUM_SETS; i++) {
-                String key = "set-" + i;
-
-                String val = "value-" + k;
-
-                IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
-
-                cache.invoke(key, new Processor(val));
-            }
-        }
-    }
-
-    /** */
-    private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
-        /** */
-        private String val;
-
-        private Processor(String val) {
-            this.val = val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
-            Set<String> vals = e.getValue();
-
-            if (vals == null)
-                vals = new HashSet<>();
-
-            vals.add(val);
-
-            e.setValue(vals);
-
-            return null;
-        }
-    }
-}


[7/8] incubator-ignite git commit: # ignite-1265 set topology version for mvcc candidate

Posted by sb...@apache.org.
# ignite-1265 set topology version for mvcc candidate


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b55365d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b55365d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b55365d3

Branch: refs/heads/ignite-1.3.3-p3
Commit: b55365d316febf834cde553fbb55b33d42194069
Parents: 8b3fed8
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 19 13:30:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 19 13:53:41 2015 +0300

----------------------------------------------------------------------
 .../distributed/GridDistributedCacheEntry.java  |  7 ++
 .../distributed/dht/GridDhtTxPrepareFuture.java |  7 +-
 .../distributed/near/GridNearCacheEntry.java    |  6 ++
 .../distributed/near/GridNearLockFuture.java    |  4 +-
 .../cache/IgniteCacheInvokeReadThroughTest.java |  2 +-
 .../near/GridCacheNearOnlyTopologySelfTest.java |  4 +-
 .../near/GridCacheNearTxForceKeyTest.java       | 76 ++++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite2.java       |  1 +
 8 files changed, 100 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index bd72764..e007190 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -68,6 +69,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
      *
      * @param threadId Owning thread ID.
      * @param ver Lock version.
+     * @param topVer Topology version.
      * @param timeout Timeout to acquire lock.
      * @param reenter Reentry flag.
      * @param tx Transaction flag.
@@ -78,6 +80,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     @Nullable public GridCacheMvccCandidate addLocal(
         long threadId,
         GridCacheVersion ver,
+        AffinityTopologyVersion topVer,
         long timeout,
         boolean reenter,
         boolean tx,
@@ -105,6 +108,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
 
             cand = mvcc.addLocal(this, threadId, ver, timeout, reenter, tx, implicitSingle);
 
+            if (cand != null)
+                cand.topologyVersion(topVer);
+
             owner = mvcc.anyOwner();
 
             boolean emptyAfter = mvcc.isEmpty();
@@ -732,6 +738,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
             return addLocal(
                 tx.threadId(),
                 tx.xidVersion(),
+                tx.topologyVersion(),
                 timeout,
                 false,
                 true,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ad1023f..1539a2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -797,6 +797,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         return map;
     }
 
+    /**
+     * @param keysMap Keys to request.
+     * @return Keys request future.
+     */
     private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
         if (F.isEmpty(keysMap))
             return null;
@@ -978,7 +982,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         fut.onResult(e);
                     }
                     catch (IgniteCheckedException e) {
-                        fut.onResult(e);
+                        if (!cctx.kernalContext().isStopping())
+                            fut.onResult(e);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 9e8d76b..194c68a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -433,6 +433,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     @Override public GridCacheMvccCandidate addLocal(
         long threadId,
         GridCacheVersion ver,
+        AffinityTopologyVersion topVer,
         long timeout,
         boolean reenter,
         boolean tx,
@@ -441,6 +442,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
             null,
             threadId,
             ver,
+            topVer,
             timeout,
             reenter,
             tx,
@@ -454,6 +456,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
      * @param dhtNodeId DHT node ID.
      * @param threadId Owning thread ID.
      * @param ver Lock version.
+     * @param topVer Topology version.
      * @param timeout Timeout to acquire lock.
      * @param reenter Reentry flag.
      * @param tx Transaction flag.
@@ -465,6 +468,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         @Nullable UUID dhtNodeId,
         long threadId,
         GridCacheVersion ver,
+        AffinityTopologyVersion topVer,
         long timeout,
         boolean reenter,
         boolean tx,
@@ -513,6 +517,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 tx,
                 implicitSingle);
 
+            cand.topologyVersion(topVer);
+
             owner = mvcc.anyOwner();
 
             boolean emptyAfter = mvcc.isEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 3d28018..b7e0d73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -307,6 +307,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             dhtNodeId,
             threadId,
             lockVer,
+            topVer,
             timeout,
             !inTx(),
             inTx(),
@@ -319,9 +320,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             txEntry.cached(entry);
         }
 
-        if (c != null)
-            c.topologyVersion(topVer);
-
         synchronized (mux) {
             entries.add(entry);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index b72540d..10ab1ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*;
 public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-//        fail("https://issues.apache.org/jira/browse/IGNITE-114");
+        fail("https://issues.apache.org/jira/browse/IGNITE-114");
     }
 
     /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
index b6bc56e..d1d7c02 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
@@ -198,7 +198,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest {
             }
 
             // Test optimistic transaction.
-            GridTestUtils.assertThrows(log, new Callable<Object>() {
+            GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
                 @Override public Object call() throws Exception {
                     try (Transaction tx = igniteNearOnly.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
                         nearOnly.put("key", "val");
@@ -208,7 +208,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest {
 
                     return null;
                 }
-            }, ClusterTopologyException.class, null);
+            }, ClusterTopologyCheckedException.class);
 
             // Test pessimistic transaction.
             GridTestUtils.assertThrowsWithCause(new Callable<Object>() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
new file mode 100644
index 0000000..44ef20d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class GridCacheNearTxForceKeyTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(ASYNC);
+        ccfg.setRebalanceDelay(5000);
+        ccfg.setBackups(0);
+        ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * Test provokes scenario when primary node sends force key request to node started transaction.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNearTx() throws Exception {
+        Ignite ignite0 = startGrid(0);
+
+        IgniteCache<Integer, Integer> cache = ignite0.cache(null);
+
+        Ignite ignite1 = startGrid(1);
+
+        final Integer key = 2;
+
+        assertNull(cache.getAndPut(key, key));
+
+        assertTrue(ignite0.affinity(null).isPrimary(ignite1.cluster().localNode(), key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index ec50399..495719f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -142,6 +142,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteCacheEntryProcessorNodeJoinTest.class));
         suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class));
+        suite.addTest(new TestSuite(GridCacheNearTxForceKeyTest.class));
 
         return suite;
     }