You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/19 14:21:15 UTC
[1/8] incubator-ignite git commit: IGNITE-1265 - Properly handle
invalid partitions in DHT prepare response.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1.3.3-p3 ac670f923 -> 7d1a550dc
IGNITE-1265 - Properly handle invalid partitions in DHT prepare response.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7a43dde7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7a43dde7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7a43dde7
Branch: refs/heads/ignite-1.3.3-p3
Commit: 7a43dde77b47478e6b02bbab9d81ad70a2299c51
Parents: 5faffb9
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 10:35:59 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 10:35:59 2015 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 1 -
.../processors/cache/GridCacheUtils.java | 20 +++++++++++++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 23 +++++++++++++++++++-
.../dht/GridDhtTxPrepareResponse.java | 17 +++++++++++++++
.../near/GridNearTxPrepareResponse.java | 3 ---
.../cache/transactions/IgniteInternalTx.java | 2 +-
.../cache/transactions/IgniteTxAdapter.java | 19 +++++++++++-----
.../cache/transactions/IgniteTxHandler.java | 5 ++---
.../ignite/internal/util/lang/GridFunc.java | 14 ++++++++++++
9 files changed, 90 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 29e3551..c128aa6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -493,7 +493,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.version(),
null,
null,
- null,
null);
res.error(req.classError());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 41e3896..1e3cd67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1681,6 +1681,26 @@ public class GridCacheUtils {
}
/**
+ * @param partsMap Cache ID to partition IDs collection map.
+ * @return Cache ID to partition ID array map.
+ */
+ public static Map<Integer, Integer[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
+ Map<Integer, Integer[]> res = new HashMap<>(partsMap.size());
+
+ for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
+ Set<Integer> parts = entry.getValue();
+
+ Integer[] partsArray = new Integer[parts.size()];
+
+ partsArray = parts.toArray(partsArray);
+
+ res.put(entry.getKey(), partsArray);
+ }
+
+ return res;
+ }
+
+ /**
* Stops store session listeners.
*
* @param ctx Kernal context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index af0fbdf..27de8cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -574,13 +574,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
// Send reply back to originating near node.
Throwable prepErr = err.get();
+ assert F.isEmpty(tx.invalidPartitions());
+
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
tx.nearXidVersion(),
tx.colocated() ? tx.xid() : tx.nearFutureId(),
nearMiniId == null ? tx.xid() : nearMiniId,
tx.xidVersion(),
tx.writeVersion(),
- tx.invalidPartitions(),
ret,
prepErr,
null);
@@ -1194,6 +1195,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
// Process invalid partitions (no need to remap).
+ // Keep this loop for backward compatibility.
if (!F.isEmpty(res.invalidPartitions())) {
for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
IgniteTxEntry entry = it.next();
@@ -1206,6 +1208,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
}
}
+ }
+
+ // Process invalid partitions (no need to remap).
+ if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
+ Map<Integer, Integer[]> invalidPartsMap = res.invalidPartitionsByCacheId();
+
+ for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
+ IgniteTxEntry entry = it.next();
+
+ Integer[] invalidParts = invalidPartsMap.get(entry.cacheId());
+
+ if (F.contains(invalidParts, entry.cached().partition())) {
+ it.remove();
+
+ if (log.isDebugEnabled())
+ log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() +
+ ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
+ }
+ }
if (dhtMapping.empty()) {
dhtMap.remove(nodeId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 753c117..cc85628 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -55,6 +55,9 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
@GridDirectCollection(int.class)
private Collection<Integer> invalidParts;
+ /** Invalid partitions by cache ID. */
+ private Map<Integer, Integer[]> invalidPartsByCacheId;
+
/** Preload entries. */
@GridDirectCollection(GridCacheEntryInfo.class)
private List<GridCacheEntryInfo> preloadEntries;
@@ -140,6 +143,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
}
/**
+ * @return Map from cacheId to an array of invalid partitions.
+ */
+ public Map<Integer, Integer[]> invalidPartitionsByCacheId() {
+ return invalidPartsByCacheId;
+ }
+
+ /**
+ * @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions.
+ */
+ public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) {
+ this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId);
+ }
+
+ /**
* Gets preload entries found on backup node.
*
* @return Collection of entry infos need to be preloaded.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index b418500..b24c62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -101,7 +101,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param miniId Mini future ID.
* @param dhtVer DHT version.
* @param writeVer Write version.
- * @param invalidParts Invalid partitions.
* @param retVal Return value.
* @param err Error.
* @param clientRemapVer Not {@code null} if client node should remap transaction.
@@ -112,7 +111,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
IgniteUuid miniId,
GridCacheVersion dhtVer,
GridCacheVersion writeVer,
- Collection<Integer> invalidParts,
GridCacheReturn retVal,
Throwable err,
AffinityTopologyVersion clientRemapVer
@@ -127,7 +125,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
this.miniId = miniId;
this.dhtVer = dhtVer;
this.writeVer = writeVer;
- this.invalidParts = invalidParts;
this.retVal = retVal;
this.clientRemapVer = clientRemapVer;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index b16e950..f2f20dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -302,7 +302,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* @return Invalid partitions.
*/
- public Set<Integer> invalidPartitions();
+ public Map<Integer, Set<Integer>> invalidPartitions();
/**
* Gets owned version for near remote transaction.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 709c208..4fc6f0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -162,7 +162,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
private AtomicBoolean preparing = new AtomicBoolean();
/** */
- private Set<Integer> invalidParts = new GridLeanSet<>();
+ private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
/**
* Transaction state. Note that state is not protected, as we want to
@@ -671,16 +671,25 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public Set<Integer> invalidPartitions() {
+ @Override public Map<Integer, Set<Integer>> invalidPartitions() {
return invalidParts;
}
/** {@inheritDoc} */
@Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) {
- invalidParts.add(part);
+ Set<Integer> parts = invalidParts.get(cacheCtx.cacheId());
+
+ if (parts == null) {
+ parts = new GridLeanSet<>();
+
+ invalidParts.put(cacheCtx.cacheId(), parts);
+ }
+
+ parts.add(part);
if (log.isDebugEnabled())
- log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']');
+ log.debug("Added invalid partition for transaction [cache=" + cacheCtx.name() + ", part=" + part +
+ ", tx=" + this + ']');
}
/** {@inheritDoc} */
@@ -1765,7 +1774,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public Set<Integer> invalidPartitions() {
+ @Override public Map<Integer, Set<Integer>> invalidPartitions() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e481e25..227cb34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -290,7 +290,6 @@ public class IgniteTxHandler {
req.version(),
null,
null,
- null,
top.topologyVersion());
try {
@@ -803,7 +802,7 @@ public class IgniteTxHandler {
res.nearEvicted(nearTx.evicted());
if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
- res.invalidPartitions(dhtTx.invalidPartitions());
+ res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
if (req.onePhaseCommit()) {
assert req.last();
@@ -1154,7 +1153,7 @@ public class IgniteTxHandler {
if (req.last())
tx.state(PREPARED);
- res.invalidPartitions(tx.invalidPartitions());
+ res.invalidPartitionsByCacheId(tx.invalidPartitions());
if (tx.empty() && req.last()) {
tx.rollback();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7a43dde7/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..8a354ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -4083,6 +4083,20 @@ public class GridFunc {
* @param val Value to find.
* @return {@code True} if array contains given value.
*/
+ public static boolean contains(Integer[] arr, int val) {
+ for (Integer el : arr) {
+ if (el == val)
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param arr Array.
+ * @param val Value to find.
+ * @return {@code True} if array contains given value.
+ */
@SuppressWarnings("ForLoopReplaceableByForEach")
public static boolean contains(long[] arr, long val) {
for (int i = 0; i < arr.length; i++) {
[2/8] incubator-ignite git commit: IGNITE-1265 - Rebuilt messages.
Posted by sb...@apache.org.
IGNITE-1265 - Rebuilt messages.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/574c6793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/574c6793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/574c6793
Branch: refs/heads/ignite-1.3.3-p3
Commit: 574c6793adb96caa614b9c0c540f4812cfc52ee5
Parents: 7a43dde
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 11:11:43 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 11:11:43 2015 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheUtils.java | 11 +++++---
.../distributed/dht/GridDhtTxPrepareFuture.java | 6 ++--
.../dht/GridDhtTxPrepareResponse.java | 29 +++++++++++++++-----
3 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574c6793/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 1e3cd67..80e0d69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1684,15 +1684,18 @@ public class GridCacheUtils {
* @param partsMap Cache ID to partition IDs collection map.
* @return Cache ID to partition ID array map.
*/
- public static Map<Integer, Integer[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
- Map<Integer, Integer[]> res = new HashMap<>(partsMap.size());
+ public static Map<Integer, int[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
+ Map<Integer, int[]> res = new HashMap<>(partsMap.size());
for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
Set<Integer> parts = entry.getValue();
- Integer[] partsArray = new Integer[parts.size()];
+ int[] partsArray = new int[parts.size()];
- partsArray = parts.toArray(partsArray);
+ int idx = 0;
+
+ for (Integer part : parts)
+ partsArray[idx++] = part;
res.put(entry.getKey(), partsArray);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574c6793/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 27de8cf..2b7e1bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1212,14 +1212,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
// Process invalid partitions (no need to remap).
if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
- Map<Integer, Integer[]> invalidPartsMap = res.invalidPartitionsByCacheId();
+ Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId();
for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
IgniteTxEntry entry = it.next();
- Integer[] invalidParts = invalidPartsMap.get(entry.cacheId());
+ int[] invalidParts = invalidPartsMap.get(entry.cacheId());
- if (F.contains(invalidParts, entry.cached().partition())) {
+ if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) {
it.remove();
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574c6793/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index cc85628..bcf7f8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -56,7 +56,8 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
private Collection<Integer> invalidParts;
/** Invalid partitions by cache ID. */
- private Map<Integer, Integer[]> invalidPartsByCacheId;
+ @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+ private Map<Integer, int[]> invalidPartsByCacheId;
/** Preload entries. */
@GridDirectCollection(GridCacheEntryInfo.class)
@@ -145,7 +146,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/**
* @return Map from cacheId to an array of invalid partitions.
*/
- public Map<Integer, Integer[]> invalidPartitionsByCacheId() {
+ public Map<Integer, int[]> invalidPartitionsByCacheId() {
return invalidPartsByCacheId;
}
@@ -255,18 +256,24 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
writer.incrementState();
case 10:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
case 11:
- if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 12:
+ if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
return false;
@@ -305,7 +312,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 10:
- miniId = reader.readIgniteUuid("miniId");
+ invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
return false;
@@ -313,7 +320,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 11:
- nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+ miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
return false;
@@ -321,6 +328,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
reader.incrementState();
case 12:
+ nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -340,6 +355,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
}
[3/8] incubator-ignite git commit: IGNITE-1265 - EntryProcessorTest
when nodes joining topology.
Posted by sb...@apache.org.
IGNITE-1265 - EntryProcessorTest when nodes joining topology.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/013d7075
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/013d7075
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/013d7075
Branch: refs/heads/ignite-1.3.3-p3
Commit: 013d7075853d4728739f3dfb2647ce6001d723c8
Parents: 574c679
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 18:18:43 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 18:18:43 2015 -0700
----------------------------------------------------------------------
.../IgniteCacheEntryProcessorRestartTest.java | 185 +++++++++++++++++++
1 file changed, 185 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/013d7075/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
new file mode 100644
index 0000000..c027ee4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests cache in-place modification logic with iterative value increment.
+ */
+public class IgniteCacheEntryProcessorRestartTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Number of nodes to test on. */
+ private static final int GRID_CNT = 2;
+
+ /** Number of increment iterations. */
+ private static final int NUM_SETS = 50;
+
+ /** Helper for excluding stopped node from iteration logic. */
+ private AtomicReferenceArray<Ignite> grids;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cache = new CacheConfiguration();
+
+ cache.setCacheMode(PARTITIONED);
+ cache.setAtomicityMode(TRANSACTIONAL);
+ cache.setWriteSynchronizationMode(FULL_SYNC);
+ cache.setBackups(1);
+ cache.setRebalanceMode(SYNC);
+
+ cfg.setCacheConfiguration(cache);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(GRID_CNT);
+
+ grids = new AtomicReferenceArray<>(GRID_CNT);
+
+ for (int i = 0; i < GRID_CNT; i++)
+ grids.set(i, grid(i));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ grids = null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testEntryProcessorRestart() throws Exception {
+ final AtomicBoolean stop = new AtomicBoolean();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final int started = 6;
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ for (int i = 0; i < started; i++) {
+ U.sleep(1_000);
+
+ startGrid(GRID_CNT + i);
+ }
+ }
+ catch (Exception e) {
+ error.compareAndSet(null, e);
+ }
+ }
+ }, 1, "starter");
+
+ try {
+ checkIncrement();
+ }
+ finally {
+ stop.set(true);
+
+ fut.get(getTestTimeout());
+ }
+
+ for (int i = 0; i < NUM_SETS; i++) {
+ for (int g = 0; g < GRID_CNT + started; g++) {
+ Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+
+ assertNotNull(vals);
+ assertEquals(100, vals.size());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkIncrement() throws Exception {
+ for (int k = 0; k < 100; k++) {
+ for (int i = 0; i < NUM_SETS; i++) {
+ String key = "set-" + i;
+
+ String val = "value-" + k;
+
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+ cache.invoke(key, new Processor(val));
+ }
+ }
+ }
+
+ /** */
+ private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+ /** */
+ private String val;
+
+ private Processor(String val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+ Set<String> vals = e.getValue();
+
+ if (vals == null)
+ vals = new HashSet<>();
+
+ vals.add(val);
+
+ e.setValue(vals);
+
+ return null;
+ }
+ }
+}
[5/8] incubator-ignite git commit: IGNITE-1265 - Entry processor must
always have the correct cache value.
Posted by sb...@apache.org.
IGNITE-1265 - Entry processor must always have the correct cache value.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5065a1ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5065a1ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5065a1ec
Branch: refs/heads/ignite-1.3.3-p3
Commit: 5065a1eccb3d71b2573d37bb6ff2c78a1bbc107c
Parents: ccaa2b2
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 19:35:50 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 19:35:50 2015 -0700
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 20 +++
.../dht/GridDhtPartitionTopology.java | 7 +
.../dht/GridDhtPartitionTopologyImpl.java | 20 +++
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 136 +++++++++++++++++--
.../cache/transactions/IgniteTxEntry.java | 18 +++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 54 ++++----
.../cache/IgniteCacheInvokeReadThroughTest.java | 2 +-
8 files changed, 223 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c3f3e7f..531678e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
lock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..7b08510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology {
public GridDhtPartitionMap localPartitionMap();
/**
+ * @param nodeId Node ID.
+ * @param part Partition.
+ * @return Partition state.
+ */
+ public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+ /**
* @return Current update sequence.
*/
public long updateSequence();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index de7f876..f356138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -614,6 +614,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6a72c89..7da6e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
* @return Future that will be completed when locks are acquired.
*/
public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
- @Nullable Iterable<IgniteTxEntry> reads,
- @Nullable Iterable<IgniteTxEntry> writes,
+ @Nullable Collection<IgniteTxEntry> reads,
+ @Nullable Collection<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
long msgId,
IgniteUuid nearMiniId,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2b7e1bc..ad1023f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Keys that should be locked. */
private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ /** Force keys future for correct transforms. */
+ private IgniteInternalFuture<?> forceKeysFut;
+
/** Locks ready flag. */
private volatile boolean locksReady;
@@ -291,7 +294,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
- if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+ if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
cached.unswap(retVal);
boolean readThrough = (retVal || hasFilters) &&
@@ -312,7 +315,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
null,
null);
- if (retVal) {
+ if (retVal || txEntry.op() == TRANSFORM) {
if (!F.isEmpty(txEntry.entryProcessors())) {
invoke = true;
@@ -339,6 +342,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
}
+ txEntry.entryProcessorCalculatedValue(val);
+
if (err != null || procRes != null)
ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
else
@@ -362,7 +367,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
ret.success(false);
}
else
- ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue());
+ ret.success(txEntry.op() != DELETE || cached.hasValue());
}
}
catch (IgniteCheckedException e) {
@@ -466,7 +471,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
private boolean mapIfLocked() {
if (checkLocks()) {
- prepare0();
+ if (!mapped.compareAndSet(false, true))
+ return false;
+
+ if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null))
+ prepare0();
+ else {
+ forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ f.get();
+
+ prepare0();
+ }
+ catch (IgniteCheckedException e) {
+ onError(e);
+ }
+ }
+ });
+ }
return true;
}
@@ -709,7 +732,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param writes Write entries.
* @param txNodes Transaction nodes mapping.
*/
- public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes,
+ @SuppressWarnings("TypeMayBeWeakened")
+ public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes) {
if (tx.empty()) {
tx.setRollbackOnly();
@@ -721,6 +745,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
this.writes = writes;
this.txNodes = txNodes;
+ if (!F.isEmpty(writes)) {
+ Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
+
+ for (IgniteTxEntry entry : writes)
+ forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+
+ forceKeysFut = forceRebalanceKeys(forceKeys);
+ }
+
readyLocks();
mapIfLocked();
@@ -735,12 +768,75 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
/**
+ * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store
+ * required key or will create new map if passed in map is {@code null}.
*
+ * @param e TX entry.
+ * @param map Map with needed preload keys.
+ * @return Map if it was created.
*/
- private void prepare0() {
- if (!mapped.compareAndSet(false, true))
- return;
+ private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys(
+ IgniteTxEntry e,
+ Map<Integer, Collection<KeyCacheObject>> map
+ ) {
+ if (retVal || !F.isEmpty(e.entryProcessors())) {
+ if (map == null)
+ map = new HashMap<>();
+
+ Collection<KeyCacheObject> keys = map.get(e.cacheId());
+
+ if (keys == null) {
+ keys = new ArrayList<>();
+
+ map.put(e.cacheId(), keys);
+ }
+
+ keys.add(e.key());
+ }
+
+ return map;
+ }
+
+ private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
+ if (F.isEmpty(keysMap))
+ return null;
+
+ GridCompoundFuture<Object, Object> compFut = null;
+ IgniteInternalFuture<Object> lastForceFut = null;
+ for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) {
+ if (lastForceFut != null && compFut == null) {
+ compFut = new GridCompoundFuture();
+
+ compFut.add(lastForceFut);
+ }
+
+ int cacheId = entry.getKey();
+
+ Collection<KeyCacheObject> keys = entry.getValue();
+
+ lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+
+ if (compFut != null)
+ compFut.add(lastForceFut);
+ }
+
+ if (compFut != null) {
+ compFut.markInitialized();
+
+ return compFut;
+ }
+ else {
+ assert lastForceFut != null;
+
+ return lastForceFut;
+ }
+ }
+
+ /**
+ *
+ */
+ private void prepare0() {
try {
// We are holding transaction-level locks for entries here, so we can get next write version.
onEntriesLocked();
@@ -957,7 +1053,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private boolean map(
IgniteTxEntry entry,
Map<UUID, GridDistributedTxMapping> futDhtMap,
- Map<UUID, GridDistributedTxMapping> futNearMap) {
+ Map<UUID, GridDistributedTxMapping> futNearMap
+ ) {
if (entry.cached().isLocal())
return false;
@@ -1024,14 +1121,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param locMap Exclude map.
* @return {@code True} if mapped.
*/
- private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
- Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
+ private boolean map(
+ IgniteTxEntry entry,
+ Iterable<ClusterNode> nodes,
+ Map<UUID, GridDistributedTxMapping> globalMap,
+ Map<UUID, GridDistributedTxMapping> locMap
+ ) {
boolean ret = false;
if (nodes != null) {
for (ClusterNode n : nodes) {
GridDistributedTxMapping global = globalMap.get(n.id());
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
+
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ CacheObject procVal = entry.entryProcessorCalculatedValue();
+
+ entry.op(procVal == null ? DELETE : UPDATE);
+ entry.value(procVal, true, false);
+ entry.entryProcessors(null);
+ }
+ }
+
if (global == null)
globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 247d350..7890831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
@GridDirectTransient
private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
+ /** Transient field for calculated entry processor value. */
+ @GridDirectTransient
+ private CacheObject entryProcessorCalcVal;
+
/** Transform closure bytes. */
@GridToStringExclude
private byte[] transformClosBytes;
@@ -775,6 +779,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
return expiryPlc;
}
+ /**
+ * @return Entry processor calculated value.
+ */
+ public CacheObject entryProcessorCalculatedValue() {
+ return entryProcessorCalcVal;
+ }
+
+ /**
+ * @param entryProcessorCalcVal Entry processor calculated value.
+ */
+ public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) {
+ this.entryProcessorCalcVal = entryProcessorCalcVal;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 9c17ebd..94bfd8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -122,38 +122,44 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
final AtomicReference<Throwable> error = new AtomicReference<>();
final int started = 6;
- IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- for (int i = 0; i < started; i++) {
- U.sleep(1_000);
-
- startGrid(GRID_CNT + i);
+ try {
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ for (int i = 0; i < started; i++) {
+ U.sleep(1_000);
+
+ startGrid(GRID_CNT + i);
+ }
+ }
+ catch (Exception e) {
+ error.compareAndSet(null, e);
}
}
- catch (Exception e) {
- error.compareAndSet(null, e);
- }
- }
- }, 1, "starter");
+ }, 1, "starter");
- try {
- checkIncrement(invokeAll);
- }
- finally {
- stop.set(true);
+ try {
+ checkIncrement(invokeAll);
+ }
+ finally {
+ stop.set(true);
- fut.get(getTestTimeout());
- }
+ fut.get(getTestTimeout());
+ }
- for (int i = 0; i < NUM_SETS; i++) {
- for (int g = 0; g < GRID_CNT + started; g++) {
- Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+ for (int i = 0; i < NUM_SETS; i++) {
+ for (int g = 0; g < GRID_CNT + started; g++) {
+ Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
- assertNotNull(vals);
- assertEquals(100, vals.size());
+ assertNotNull(vals);
+ assertEquals(100, vals.size());
+ }
}
}
+ finally {
+ for (int i = 0; i < started; i++)
+ stopGrid(GRID_CNT + i);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 10ab1ab..b72540d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*;
public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-114");
+// fail("https://issues.apache.org/jira/browse/IGNITE-114");
}
/** */
[8/8] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-1265' into ignite-1.3.3-p3
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1265' into ignite-1.3.3-p3
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7d1a550d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7d1a550d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7d1a550d
Branch: refs/heads/ignite-1.3.3-p3
Commit: 7d1a550dcac96dc2028aaeab5632c9d35f99489b
Parents: ac670f9 b55365d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 19 15:20:13 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 19 15:20:13 2015 +0300
----------------------------------------------------------------------
.../CachePartialUpdateCheckedException.java | 11 +-
.../processors/cache/GridCacheIoManager.java | 1 -
.../processors/cache/GridCacheUtils.java | 23 ++
.../distributed/GridDistributedCacheEntry.java | 7 +
.../dht/GridClientPartitionTopology.java | 20 ++
.../dht/GridDhtPartitionTopology.java | 7 +
.../dht/GridDhtPartitionTopologyImpl.java | 20 ++
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 166 ++++++++++++--
.../dht/GridDhtTxPrepareResponse.java | 42 +++-
.../distributed/near/GridNearCacheEntry.java | 6 +
.../distributed/near/GridNearLockFuture.java | 4 +-
.../near/GridNearTxPrepareResponse.java | 3 -
.../cache/transactions/IgniteInternalTx.java | 2 +-
.../cache/transactions/IgniteTxAdapter.java | 19 +-
.../cache/transactions/IgniteTxEntry.java | 18 ++
.../cache/transactions/IgniteTxHandler.java | 5 +-
.../ignite/internal/util/lang/GridFunc.java | 14 ++
...teAtomicCacheEntryProcessorNodeJoinTest.java | 32 +++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 225 +++++++++++++++++++
.../near/GridCacheNearOnlyTopologySelfTest.java | 4 +-
.../near/GridCacheNearTxForceKeyTest.java | 76 +++++++
.../testsuites/IgniteCacheTestSuite2.java | 4 +
23 files changed, 670 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
[6/8] incubator-ignite git commit: # ignite-1265
Posted by sb...@apache.org.
# ignite-1265
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8b3fed85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8b3fed85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8b3fed85
Branch: refs/heads/ignite-1.3.3-p3
Commit: 8b3fed850ccc4527a6593c5ec4e596ca6c08b61f
Parents: 5065a1e
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 19 09:32:54 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 19 10:26:10 2015 +0300
----------------------------------------------------------------------
.../CachePartialUpdateCheckedException.java | 11 ++++---
...teAtomicCacheEntryProcessorNodeJoinTest.java | 32 ++++++++++++++++++++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 25 ++++++++-------
.../testsuites/IgniteCacheTestSuite2.java | 3 ++
4 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
index c2259df..fc846f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
@@ -47,8 +47,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
* Gets collection of failed keys.
* @return Collection of failed keys.
*/
- public <K> Collection<K> failedKeys() {
- return (Collection<K>)failedKeys;
+ @SuppressWarnings("unchecked")
+ public synchronized <K> Collection<K> failedKeys() {
+ return new HashSet<>((Collection<K>)failedKeys);
}
/**
@@ -56,7 +57,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
* @param err Error.
* @param topVer Topology version for failed update.
*/
- public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
+ public synchronized void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
if (topVer != null) {
AffinityTopologyVersion topVer0 = this.topVer;
@@ -72,7 +73,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
/**
* @return Topology version.
*/
- public AffinityTopologyVersion topologyVersion() {
+ public synchronized AffinityTopologyVersion topologyVersion() {
return topVer;
}
@@ -80,7 +81,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
* @param failedKeys Failed keys.
* @param err Error.
*/
- public void add(Collection<?> failedKeys, Throwable err) {
+ public synchronized void add(Collection<?> failedKeys, Throwable err) {
add(failedKeys, err, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
new file mode 100644
index 0000000..af87a7d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class IgniteAtomicCacheEntryProcessorNodeJoinTest extends IgniteCacheEntryProcessorNodeJoinTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 94bfd8f..955a792 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -51,9 +52,6 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
/** Number of increment iterations. */
private static final int NUM_SETS = 50;
- /** Helper for excluding stopped node from iteration logic. */
- private AtomicReferenceArray<Ignite> grids;
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -61,7 +59,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
CacheConfiguration cache = new CacheConfiguration();
cache.setCacheMode(PARTITIONED);
- cache.setAtomicityMode(TRANSACTIONAL);
+ cache.setAtomicityMode(atomicityMode());
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setBackups(1);
cache.setRebalanceMode(SYNC);
@@ -83,21 +81,21 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
return cfg;
}
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
startGrids(GRID_CNT);
-
- grids = new AtomicReferenceArray<>(GRID_CNT);
-
- for (int i = 0; i < GRID_CNT; i++)
- grids.set(i, grid(i));
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
-
- grids = null;
}
/**
@@ -115,6 +113,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/**
+ * @param invokeAll If {@code true} tests invokeAll operation.
* @throws Exception If failed.
*/
private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception {
@@ -163,6 +162,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
}
/**
+ * @param invokeAll If {@code true} tests invokeAll operation.
* @throws Exception If failed.
*/
private void checkIncrement(boolean invokeAll) throws Exception {
@@ -201,6 +201,9 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
/** */
private String val;
+ /**
+ * @param val Value.
+ */
private Processor(String val) {
this.val = val;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b3fed85/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dcbab07..ec50399 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -140,6 +140,9 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(IgniteCacheClientNodeChangingTopologyTest.class));
suite.addTest(new TestSuite(IgniteCacheClientNodeConcurrentStart.class));
+ suite.addTest(new TestSuite(IgniteCacheEntryProcessorNodeJoinTest.class));
+ suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class));
+
return suite;
}
}
[4/8] incubator-ignite git commit: IGNITE-1265 - EntryProcessorTest
when nodes joining topology.
Posted by sb...@apache.org.
IGNITE-1265 - EntryProcessorTest when nodes joining topology.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ccaa2b20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ccaa2b20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ccaa2b20
Branch: refs/heads/ignite-1.3.3-p3
Commit: ccaa2b20dab5438603471796b7155f309261a41f
Parents: 013d707
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 18:38:36 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 18:38:36 2015 -0700
----------------------------------------------------------------------
.../IgniteCacheEntryProcessorNodeJoinTest.java | 216 +++++++++++++++++++
.../IgniteCacheEntryProcessorRestartTest.java | 185 ----------------
2 files changed, 216 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ccaa2b20/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
new file mode 100644
index 0000000..9c17ebd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests cache in-place modification logic with iterative value increment.
+ */
+public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Number of nodes to test on. */
+ private static final int GRID_CNT = 2;
+
+ /** Number of increment iterations. */
+ private static final int NUM_SETS = 50;
+
+ /** Helper for excluding stopped node from iteration logic. */
+ private AtomicReferenceArray<Ignite> grids;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cache = new CacheConfiguration();
+
+ cache.setCacheMode(PARTITIONED);
+ cache.setAtomicityMode(TRANSACTIONAL);
+ cache.setWriteSynchronizationMode(FULL_SYNC);
+ cache.setBackups(1);
+ cache.setRebalanceMode(SYNC);
+
+ cfg.setCacheConfiguration(cache);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(GRID_CNT);
+
+ grids = new AtomicReferenceArray<>(GRID_CNT);
+
+ for (int i = 0; i < GRID_CNT; i++)
+ grids.set(i, grid(i));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ grids = null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleEntryProcessorNodeJoin() throws Exception {
+ checkEntryProcessorNodeJoin(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAllEntryProcessorNodeJoin() throws Exception {
+ checkEntryProcessorNodeJoin(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception {
+ final AtomicBoolean stop = new AtomicBoolean();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final int started = 6;
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ for (int i = 0; i < started; i++) {
+ U.sleep(1_000);
+
+ startGrid(GRID_CNT + i);
+ }
+ }
+ catch (Exception e) {
+ error.compareAndSet(null, e);
+ }
+ }
+ }, 1, "starter");
+
+ try {
+ checkIncrement(invokeAll);
+ }
+ finally {
+ stop.set(true);
+
+ fut.get(getTestTimeout());
+ }
+
+ for (int i = 0; i < NUM_SETS; i++) {
+ for (int g = 0; g < GRID_CNT + started; g++) {
+ Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+
+ assertNotNull(vals);
+ assertEquals(100, vals.size());
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkIncrement(boolean invokeAll) throws Exception {
+ for (int k = 0; k < 100; k++) {
+ if (invokeAll) {
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+ Map<String, Processor> procs = new LinkedHashMap<>();
+
+ for (int i = 0; i < NUM_SETS; i++) {
+ String key = "set-" + i;
+
+ String val = "value-" + k;
+
+ cache.invoke(key, new Processor(val));
+ }
+
+ cache.invokeAll(procs);
+ }
+ else {
+ for (int i = 0; i < NUM_SETS; i++) {
+ String key = "set-" + i;
+
+ String val = "value-" + k;
+
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+ cache.invoke(key, new Processor(val));
+ }
+ }
+ }
+ }
+
+ /** */
+ private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+ /** */
+ private String val;
+
+ private Processor(String val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+ Set<String> vals = e.getValue();
+
+ if (vals == null)
+ vals = new HashSet<>();
+
+ vals.add(val);
+
+ e.setValue(vals);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ccaa2b20/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
deleted file mode 100644
index c027ee4..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import javax.cache.processor.*;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheRebalanceMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Tests cache in-place modification logic with iterative value increment.
- */
-public class IgniteCacheEntryProcessorRestartTest extends GridCommonAbstractTest {
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Number of nodes to test on. */
- private static final int GRID_CNT = 2;
-
- /** Number of increment iterations. */
- private static final int NUM_SETS = 50;
-
- /** Helper for excluding stopped node from iteration logic. */
- private AtomicReferenceArray<Ignite> grids;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheConfiguration cache = new CacheConfiguration();
-
- cache.setCacheMode(PARTITIONED);
- cache.setAtomicityMode(TRANSACTIONAL);
- cache.setWriteSynchronizationMode(FULL_SYNC);
- cache.setBackups(1);
- cache.setRebalanceMode(SYNC);
-
- cfg.setCacheConfiguration(cache);
-
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setIpFinder(IP_FINDER);
-
- TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
- commSpi.setSharedMemoryPort(-1);
-
- cfg.setCommunicationSpi(commSpi);
-
- cfg.setDiscoverySpi(disco);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(GRID_CNT);
-
- grids = new AtomicReferenceArray<>(GRID_CNT);
-
- for (int i = 0; i < GRID_CNT; i++)
- grids.set(i, grid(i));
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
-
- grids = null;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testEntryProcessorRestart() throws Exception {
- final AtomicBoolean stop = new AtomicBoolean();
- final AtomicReference<Throwable> error = new AtomicReference<>();
- final int started = 6;
-
- IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- for (int i = 0; i < started; i++) {
- U.sleep(1_000);
-
- startGrid(GRID_CNT + i);
- }
- }
- catch (Exception e) {
- error.compareAndSet(null, e);
- }
- }
- }, 1, "starter");
-
- try {
- checkIncrement();
- }
- finally {
- stop.set(true);
-
- fut.get(getTestTimeout());
- }
-
- for (int i = 0; i < NUM_SETS; i++) {
- for (int g = 0; g < GRID_CNT + started; g++) {
- Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
-
- assertNotNull(vals);
- assertEquals(100, vals.size());
- }
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- private void checkIncrement() throws Exception {
- for (int k = 0; k < 100; k++) {
- for (int i = 0; i < NUM_SETS; i++) {
- String key = "set-" + i;
-
- String val = "value-" + k;
-
- IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
-
- cache.invoke(key, new Processor(val));
- }
- }
- }
-
- /** */
- private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
- /** */
- private String val;
-
- private Processor(String val) {
- this.val = val;
- }
-
- /** {@inheritDoc} */
- @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
- Set<String> vals = e.getValue();
-
- if (vals == null)
- vals = new HashSet<>();
-
- vals.add(val);
-
- e.setValue(vals);
-
- return null;
- }
- }
-}
[7/8] incubator-ignite git commit: # ignite-1265 set topology version
for mvcc candidate
Posted by sb...@apache.org.
# ignite-1265 set topology version for mvcc candidate
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b55365d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b55365d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b55365d3
Branch: refs/heads/ignite-1.3.3-p3
Commit: b55365d316febf834cde553fbb55b33d42194069
Parents: 8b3fed8
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 19 13:30:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 19 13:53:41 2015 +0300
----------------------------------------------------------------------
.../distributed/GridDistributedCacheEntry.java | 7 ++
.../distributed/dht/GridDhtTxPrepareFuture.java | 7 +-
.../distributed/near/GridNearCacheEntry.java | 6 ++
.../distributed/near/GridNearLockFuture.java | 4 +-
.../cache/IgniteCacheInvokeReadThroughTest.java | 2 +-
.../near/GridCacheNearOnlyTopologySelfTest.java | 4 +-
.../near/GridCacheNearTxForceKeyTest.java | 76 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite2.java | 1 +
8 files changed, 100 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index bd72764..e007190 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
@@ -68,6 +69,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
*
* @param threadId Owning thread ID.
* @param ver Lock version.
+ * @param topVer Topology version.
* @param timeout Timeout to acquire lock.
* @param reenter Reentry flag.
* @param tx Transaction flag.
@@ -78,6 +80,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
@Nullable public GridCacheMvccCandidate addLocal(
long threadId,
GridCacheVersion ver,
+ AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
@@ -105,6 +108,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
cand = mvcc.addLocal(this, threadId, ver, timeout, reenter, tx, implicitSingle);
+ if (cand != null)
+ cand.topologyVersion(topVer);
+
owner = mvcc.anyOwner();
boolean emptyAfter = mvcc.isEmpty();
@@ -732,6 +738,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
return addLocal(
tx.threadId(),
tx.xidVersion(),
+ tx.topologyVersion(),
timeout,
false,
true,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ad1023f..1539a2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -797,6 +797,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return map;
}
+ /**
+ * @param keysMap Keys to request.
+ * @return Keys request future.
+ */
private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
if (F.isEmpty(keysMap))
return null;
@@ -978,7 +982,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
fut.onResult(e);
}
catch (IgniteCheckedException e) {
- fut.onResult(e);
+ if (!cctx.kernalContext().isStopping())
+ fut.onResult(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 9e8d76b..194c68a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -433,6 +433,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
@Override public GridCacheMvccCandidate addLocal(
long threadId,
GridCacheVersion ver,
+ AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
@@ -441,6 +442,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
null,
threadId,
ver,
+ topVer,
timeout,
reenter,
tx,
@@ -454,6 +456,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* @param dhtNodeId DHT node ID.
* @param threadId Owning thread ID.
* @param ver Lock version.
+ * @param topVer Topology version.
* @param timeout Timeout to acquire lock.
* @param reenter Reentry flag.
* @param tx Transaction flag.
@@ -465,6 +468,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
@Nullable UUID dhtNodeId,
long threadId,
GridCacheVersion ver,
+ AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
@@ -513,6 +517,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
tx,
implicitSingle);
+ cand.topologyVersion(topVer);
+
owner = mvcc.anyOwner();
boolean emptyAfter = mvcc.isEmpty();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 3d28018..b7e0d73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -307,6 +307,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
dhtNodeId,
threadId,
lockVer,
+ topVer,
timeout,
!inTx(),
inTx(),
@@ -319,9 +320,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
txEntry.cached(entry);
}
- if (c != null)
- c.topologyVersion(topVer);
-
synchronized (mux) {
entries.add(entry);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index b72540d..10ab1ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*;
public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
-// fail("https://issues.apache.org/jira/browse/IGNITE-114");
+ fail("https://issues.apache.org/jira/browse/IGNITE-114");
}
/** */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
index b6bc56e..d1d7c02 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
@@ -198,7 +198,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest {
}
// Test optimistic transaction.
- GridTestUtils.assertThrows(log, new Callable<Object>() {
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = igniteNearOnly.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
nearOnly.put("key", "val");
@@ -208,7 +208,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest {
return null;
}
- }, ClusterTopologyException.class, null);
+ }, ClusterTopologyCheckedException.class);
// Test pessimistic transaction.
GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
new file mode 100644
index 0000000..44ef20d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class GridCacheNearTxForceKeyTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(ASYNC);
+ ccfg.setRebalanceDelay(5000);
+ ccfg.setBackups(0);
+ ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * Test provokes scenario when primary node sends force key request to node started transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNearTx() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ IgniteCache<Integer, Integer> cache = ignite0.cache(null);
+
+ Ignite ignite1 = startGrid(1);
+
+ final Integer key = 2;
+
+ assertNull(cache.getAndPut(key, key));
+
+ assertTrue(ignite0.affinity(null).isPrimary(ignite1.cluster().localNode(), key));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b55365d3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index ec50399..495719f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -142,6 +142,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(IgniteCacheEntryProcessorNodeJoinTest.class));
suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class));
+ suite.addTest(new TestSuite(GridCacheNearTxForceKeyTest.class));
return suite;
}