You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/24 04:41:01 UTC
[3/4] ignite git commit: Merge branches 'ignite-843' and 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-843
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 221b230..eb7c78f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -155,7 +155,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
@Nullable UUID subjId,
String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -183,7 +184,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
});
}
- AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+ AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
subjId = ctx.subjectIdPerCall(subjId, opCtx);
@@ -197,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
deserializePortable,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
- skipVals);
+ skipVals,
+ canRemap);
}
/** {@inheritDoc} */
@@ -226,7 +230,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
* @param skipVals Skip values flag.
* @return Loaded values.
*/
- public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys,
+ public IgniteInternalFuture<Map<K, V>> loadAsync(
+ @Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean reload,
boolean forcePrimary,
@@ -235,7 +240,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
String taskName,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -340,7 +346,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
taskName,
deserializePortable,
expiryPlc,
- skipVals);
+ skipVals,
+ canRemap);
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index c784948..90ca8df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1125,8 +1125,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* @return Topology exception with user-friendly message.
*/
private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
- return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
- "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+ ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+ "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+ topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+ return topEx;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cbf6b40..1a90de9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -54,6 +54,10 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
/** */
+ private static final int DUMP_PENDING_OBJECTS_THRESHOLD =
+ IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
+
+ /** */
private static final long serialVersionUID = 0L;
/** Dummy flag. */
@@ -711,6 +715,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("Before waiting for partition release future: " + this);
+ int dumpedObjects = 0;
+
while (true) {
try {
partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
@@ -719,7 +725,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
catch (IgniteFutureTimeoutCheckedException ignored) {
// Print pending transactions and locks that might have led to hang.
- dumpPendingObjects();
+ if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+ dumpPendingObjects();
+
+ dumpedObjects++;
+ }
}
}
@@ -731,6 +741,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
+ dumpedObjects = 0;
+
while (true) {
try {
locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
@@ -738,22 +750,26 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
- U.warn(log, "Failed to wait for locks release future. " +
- "Dumping pending objects that might be the cause: " + cctx.localNodeId());
+ if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+ U.warn(log, "Failed to wait for locks release future. " +
+ "Dumping pending objects that might be the cause: " + cctx.localNodeId());
- U.warn(log, "Locked keys:");
+ U.warn(log, "Locked keys:");
- for (IgniteTxKey key : cctx.mvcc().lockedKeys())
- U.warn(log, "Locked key: " + key);
+ for (IgniteTxKey key : cctx.mvcc().lockedKeys())
+ U.warn(log, "Locked key: " + key);
- for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
- U.warn(log, "Locked near key: " + key);
+ for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
+ U.warn(log, "Locked near key: " + key);
- Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
- cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
+ Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
+ cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
- for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
- U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+ for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
+ U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+
+ dumpedObjects++;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 041f83a..2bf5365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -364,7 +364,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -387,7 +388,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
deserializePortable,
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
- opCtx != null && opCtx.skipStore());
+ opCtx != null && opCtx.skipStore(),
+ canRemap);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 79b7c1a..845be38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -203,13 +203,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
return (IgniteInternalFuture)loadAsync(tx,
keys,
reload,
- false,
+ /*force primary*/false,
subjId,
taskName,
- true,
- null,
+ /*deserialize portable*/true,
+ /*expiry policy*/null,
skipVals,
- /*skip store*/false);
+ /*skip store*/false,
+ /*can remap*/true);
}
/**
@@ -234,7 +235,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
boolean deserializePortable,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVal,
- boolean skipStore
+ boolean skipStore,
+ boolean canRemap
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -253,7 +255,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
taskName,
deserializePortable,
expiry,
- skipVal);
+ skipVal,
+ canRemap);
// init() will register future for responses if future has remote mappings.
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 9e8d76b..6f4f15e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -333,7 +333,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
true,
null,
false,
- /*skip store*/false).get().get(keyValue(false));
+ /*skip store*/false,
+ /*can remap*/true
+ ).get().get(keyValue(false));
}
/**
@@ -433,6 +435,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
@Override public GridCacheMvccCandidate addLocal(
long threadId,
GridCacheVersion ver,
+ AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
@@ -441,6 +444,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
null,
threadId,
ver,
+ topVer,
timeout,
reenter,
tx,
@@ -454,6 +458,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* @param dhtNodeId DHT node ID.
* @param threadId Owning thread ID.
* @param ver Lock version.
+ * @param topVer Topology version.
* @param timeout Timeout to acquire lock.
* @param reenter Reentry flag.
* @param tx Transaction flag.
@@ -465,6 +470,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
@Nullable UUID dhtNodeId,
long threadId,
GridCacheVersion ver,
+ AffinityTopologyVersion topVer,
long timeout,
boolean reenter,
boolean tx,
@@ -513,6 +519,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
tx,
implicitSingle);
+ cand.topologyVersion(topVer);
+
owner = mvcc.anyOwner();
boolean emptyAfter = mvcc.isEmpty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 2017654..951fddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -62,7 +62,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
/** Context. */
- private GridCacheContext<K, V> cctx;
+ private final GridCacheContext<K, V> cctx;
/** Keys. */
private Collection<KeyCacheObject> keys;
@@ -106,6 +106,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
/** Expiry policy. */
private IgniteCacheExpiryPolicy expiryPlc;
+ /** Flag indicating that get should be done on a locked topology version. */
+ private final boolean canRemap;
+
/**
* @param cctx Context.
* @param keys Keys.
@@ -131,7 +134,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
String taskName,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
@@ -148,6 +152,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
this.deserializePortable = deserializePortable;
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
+ this.canRemap = canRemap;
futId = IgniteUuid.randomUuid();
@@ -161,7 +166,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
* Initializes future.
*/
public void init() {
- AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+ AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
@@ -327,7 +334,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
remapKeys.add(key);
}
- AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+ AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
"not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -435,7 +442,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
taskName,
expiryPlc);
- ClusterNode primary = null;
+ ClusterNode affNode = null;
if (v == null && allowLocRead && cctx.affinityNode()) {
GridDhtCacheAdapter<K, V> dht = cache().dht();
@@ -472,16 +479,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
near.metrics0().onRead(true);
}
else {
- primary = cctx.affinity().primary(key, topVer);
+ affNode = affinityNode(key, topVer);
- if (primary == null) {
+ if (affNode == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
return savedVers;
}
- if (!primary.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+ if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
near.metrics0().onRead(false);
}
}
@@ -507,10 +514,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
else {
- if (primary == null) {
- primary = cctx.affinity().primary(key, topVer);
+ if (affNode == null) {
+ affNode = affinityNode(key, topVer);
- if (primary == null) {
+ if (affNode == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
@@ -527,13 +534,13 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion());
- LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(primary);
+ LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
if (keys != null && keys.containsKey(key)) {
if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
MAX_REMAP_CNT + " attempts (key got remapped to the same node) " +
- "[key=" + key + ", node=" + U.toShortString(primary) + ", mappings=" + mapped + ']'));
+ "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']'));
return savedVers;
}
@@ -545,10 +552,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(key)))
addRdr = true;
- LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(primary);
+ LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(affNode);
if (old == null)
- mappings.put(primary, old = new LinkedHashMap<>(3, 1f));
+ mappings.put(affNode, old = new LinkedHashMap<>(3, 1f));
old.put(key, addRdr);
}
@@ -579,6 +586,28 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
}
/**
+ * Affinity node to send get request to.
+ *
+ * @param key Key to get.
+ * @param topVer Topology version.
+ * @return Affinity node to get key from.
+ */
+ private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ if (!canRemap) {
+ List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+ for (ClusterNode node : affNodes) {
+ if (cctx.discovery().alive(node))
+ return node;
+ }
+
+ return null;
+ }
+ else
+ return cctx.affinity().primary(key, topVer);
+ }
+
+ /**
* @return Near cache.
*/
private GridNearCacheAdapter<K, V> cache() {
@@ -752,37 +781,46 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
- final AffinityTopologyVersion updTopVer =
- new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+ // Try getting value from alive nodes.
+ if (!canRemap) {
+ // Remap
+ map(keys.keySet(), F.t(node, keys), topVer);
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
-
- cctx.affinity().affinityReadyFuture(updTopVer).listen(
- new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
- try {
- fut.get();
-
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
-
- onDone(Collections.<K, V>emptyMap());
- }
- catch (IgniteCheckedException e) {
- GridNearGetFuture.this.onDone(e);
+ onDone(Collections.<K, V>emptyMap());
+ }
+ else {
+ final AffinityTopologyVersion updTopVer =
+ new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+ final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+ cctx.kernalContext().config().getNetworkTimeout(),
+ updTopVer,
+ e);
+
+ cctx.affinity().affinityReadyFuture(updTopVer).listen(
+ new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ if (timeout.finish()) {
+ cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+
+ try {
+ fut.get();
+
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
+
+ onDone(Collections.<K, V>emptyMap());
+ }
+ catch (IgniteCheckedException e) {
+ GridNearGetFuture.this.onDone(e);
+ }
}
}
}
- }
- );
+ );
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 3d28018..2815194 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -307,6 +307,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
dhtNodeId,
threadId,
lockVer,
+ topVer,
timeout,
!inTx(),
inTx(),
@@ -319,9 +320,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
txEntry.cached(entry);
}
- if (c != null)
- c.topologyVersion(topVer);
-
synchronized (mux) {
entries.add(entry);
}
@@ -1234,8 +1232,12 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @return Topology exception with user-friendly message.
*/
private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
- return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
- "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+ ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+ "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+ topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+ return topEx;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e71dd62..8e66cb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -101,7 +101,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
MiniFuture f = (MiniFuture) fut;
if (f.node().id().equals(nodeId)) {
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+ nodeId);
+
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+ f.onResult(e);
found = true;
}
@@ -563,8 +568,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
try {
cctx.io().send(n, req, tx.ioPolicy());
}
+ catch (ClusterTopologyCheckedException e) {
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+ fut.onResult(e);
+ }
catch (IgniteCheckedException e) {
- // Fail the whole thing.
fut.onResult(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index f51b4b8..9ce5bd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -72,7 +72,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
MiniFuture f = (MiniFuture)fut;
if (f.node().id().equals(nodeId)) {
- f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+ nodeId);
+
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+ f.onNodeLeft(e);
found = true;
}
@@ -224,6 +229,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
cctx.io().send(node, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(topVer));
+
fut.onNodeLeft(e);
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 696acfb..a1f1383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -101,7 +101,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
@Nullable UUID subjId,
String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -142,7 +143,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
deserializePortable,
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
- skipStore);
+ skipStore,
+ canRemap);
}
/**
@@ -172,7 +174,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
tx.resolveTaskName(),
deserializePortable,
expiryPlc,
- skipVals);
+ skipVals,
+ /*can remap*/true);
// init() will register future for responses if it has remote mappings.
fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index cb391e4..5ff7345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -313,7 +313,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
});
}
else if (cacheCtx.isColocated()) {
- return cacheCtx.colocated().loadAsync(keys,
+ return cacheCtx.colocated().loadAsync(
+ keys,
readThrough,
/*reload*/false,
/*force primary*/false,
@@ -322,7 +323,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
resolveTaskName(),
deserializePortable,
accessPolicy(cacheCtx, keys),
- skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+ skipVals,
+ /*can remap*/true
+ ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
@Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
try {
Map<Object, Object> map = f.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index b418500..b24c62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -101,7 +101,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
* @param miniId Mini future ID.
* @param dhtVer DHT version.
* @param writeVer Write version.
- * @param invalidParts Invalid partitions.
* @param retVal Return value.
* @param err Error.
* @param clientRemapVer Not {@code null} if client node should remap transaction.
@@ -112,7 +111,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
IgniteUuid miniId,
GridCacheVersion dhtVer,
GridCacheVersion writeVer,
- Collection<Integer> invalidParts,
GridCacheReturn retVal,
Throwable err,
AffinityTopologyVersion clientRemapVer
@@ -127,7 +125,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
this.miniId = miniId;
this.dhtVer = dhtVer;
this.writeVer = writeVer;
- this.invalidParts = invalidParts;
this.retVal = retVal;
this.clientRemapVer = clientRemapVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index ea59f1f..6c04761 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -261,7 +261,9 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
// Allow next lock in the thread to proceed.
if (!cand.used()) {
- GridLocalCacheEntry e = (GridLocalCacheEntry)cctx.cache().peekEx(cand.key());
+ GridCacheContext cctx0 = cand.parent().context();
+
+ GridLocalCacheEntry e = (GridLocalCacheEntry)cctx0.cache().peekEx(cand.key());
// At this point candidate may have been removed and entry destroyed,
// so we check for null.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 8dd3276..3dc5946 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -458,7 +458,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
@Nullable UUID subjId,
final String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
A.notNull(keys, "keys");
@@ -570,8 +571,18 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (success || !storeEnabled)
return vals;
- return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), null, false, subjId, taskName, deserializePortable,
- false, expiry, skipVals).get();
+ return getAllAsync(
+ keys,
+ opCtx == null || !opCtx.skipStore(),
+ null,
+ false,
+ subjId,
+ taskName,
+ deserializePortable,
+ /*force primary*/false,
+ expiry,
+ skipVals,
+ /*can remap*/true).get();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 4f21ad7..4e43d97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -302,7 +302,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* @return Invalid partitions.
*/
- public Set<Integer> invalidPartitions();
+ public Map<Integer, Set<Integer>> invalidPartitions();
/**
* Gets owned version for near remote transaction.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index e9fdd22..b8beb15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -162,7 +162,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
private AtomicBoolean preparing = new AtomicBoolean();
/** */
- private Set<Integer> invalidParts = new GridLeanSet<>();
+ private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3);
/**
* Transaction state. Note that state is not protected, as we want to
@@ -671,16 +671,25 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public Set<Integer> invalidPartitions() {
+ @Override public Map<Integer, Set<Integer>> invalidPartitions() {
return invalidParts;
}
/** {@inheritDoc} */
@Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) {
- invalidParts.add(part);
+ Set<Integer> parts = invalidParts.get(cacheCtx.cacheId());
+
+ if (parts == null) {
+ parts = new GridLeanSet<>();
+
+ invalidParts.put(cacheCtx.cacheId(), parts);
+ }
+
+ parts.add(part);
if (log.isDebugEnabled())
- log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']');
+ log.debug("Added invalid partition for transaction [cache=" + cacheCtx.name() + ", part=" + part +
+ ", tx=" + this + ']');
}
/** {@inheritDoc} */
@@ -1779,7 +1788,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public Set<Integer> invalidPartitions() {
+ @Override public Map<Integer, Set<Integer>> invalidPartitions() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 3c792f6..e9070a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
@GridDirectTransient
private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
+ /** Transient field for calculated entry processor value. */
+ @GridDirectTransient
+ private CacheObject entryProcessorCalcVal;
+
/** Transform closure bytes. */
@GridToStringExclude
private byte[] transformClosBytes;
@@ -787,6 +791,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
return expiryPlc;
}
+ /**
+ * @return Entry processor calculated value.
+ */
+ public CacheObject entryProcessorCalculatedValue() {
+ return entryProcessorCalcVal;
+ }
+
+ /**
+ * @param entryProcessorCalcVal Entry processor calculated value.
+ */
+ public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) {
+ this.entryProcessorCalcVal = entryProcessorCalcVal;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e481e25..227cb34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -290,7 +290,6 @@ public class IgniteTxHandler {
req.version(),
null,
null,
- null,
top.topologyVersion());
try {
@@ -803,7 +802,7 @@ public class IgniteTxHandler {
res.nearEvicted(nearTx.evicted());
if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
- res.invalidPartitions(dhtTx.invalidPartitions());
+ res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
if (req.onePhaseCommit()) {
assert req.last();
@@ -1154,7 +1153,7 @@ public class IgniteTxHandler {
if (req.last())
tx.state(PREPARED);
- res.invalidPartitions(tx.invalidPartitions());
+ res.invalidPartitionsByCacheId(tx.invalidPartitions());
if (tx.empty() && req.last()) {
tx.rollback();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b354fed..a32e7b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -2216,8 +2216,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
missedForLoad.add(cacheKey);
}
else {
- assert !transform;
- assert txEntry.op() != TRANSFORM;
+ assert !implicit() || !transform : this;
+ assert txEntry.op() != TRANSFORM : txEntry;
if (retval)
ret.set(cacheCtx, null, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 78b09e6..0f43b8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -651,6 +651,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException {
ServiceConfiguration cfg = dep.configuration();
+ Object nodeFilter = cfg.getNodeFilter();
+
+ if (nodeFilter != null)
+ ctx.resource().injectGeneric(nodeFilter);
+
int totalCnt = cfg.getTotalCount();
int maxPerNodeCnt = cfg.getMaxPerNodeCount();
String cacheName = cfg.getCacheName();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f8c4c7e..c9be652 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.mxbean.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.io.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
@@ -580,7 +581,14 @@ public abstract class IgniteUtils {
m.put(ClusterTopologyCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
@Override public IgniteException apply(IgniteCheckedException e) {
- return new ClusterTopologyException(e.getMessage(), e);
+ ClusterTopologyException topEx = new ClusterTopologyException(e.getMessage(), e);
+
+ ClusterTopologyCheckedException checked = (ClusterTopologyCheckedException)e;
+
+ if (checked.retryReadyFuture() != null)
+ topEx.retryReadyFuture(new IgniteFutureImpl<>(checked.retryReadyFuture()));
+
+ return topEx;
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index f3bcab0..5c43ad9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -4085,6 +4085,20 @@ public class GridFunc {
* @param val Value to find.
* @return {@code True} if array contains given value.
*/
+ public static boolean contains(Integer[] arr, int val) {
+ for (Integer el : arr) {
+ if (el == val)
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param arr Array.
+ * @param val Value to find.
+ * @return {@code True} if array contains given value.
+ */
@SuppressWarnings("ForLoopReplaceableByForEach")
public static boolean contains(long[] arr, long val) {
for (int i = 0; i < arr.length; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index b93c547..2a07879 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -99,6 +99,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
/** Local address */
private String locAddr;
+ /** Time to live. */
+ private Integer ttl;
+
/** */
@GridToStringExclude
private Collection<AddressSender> addrSnds;
@@ -223,6 +226,32 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
return locAddr;
}
+ /**
+ * Set the default time-to-live for multicast packets sent out on this
+ * IP finder in order to control the scope of the multicast.
+ * <p>
+ * The TTL has to be in the range {@code 0 <= TTL <= 255}.
+ * <p>
+ * If TTL is {@code 0}, packets are not transmitted on the network,
+ * but may be delivered locally.
+ *
+ * @param ttl Time to live.
+ */
+ @IgniteSpiConfiguration(optional = true)
+ public void setTimeToLive(int ttl) {
+ this.ttl = ttl;
+ }
+
+ /**
+ * Set the default time-to-live for multicast packets sent out on this
+ * IP finder.
+ *
+ * @return Time to live.
+ */
+ public int getTimeToLive() {
+ return ttl;
+ }
+
/** {@inheritDoc} */
@Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
// If IGNITE_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from
@@ -245,6 +274,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
throw new IgniteSpiException("Invalid number of address request attempts, " +
"value greater than zero is expected: " + addrReqAttempts);
+ if (ttl != null && (ttl < 0 || ttl > 255))
+ throw new IgniteSpiException("Time-to-live value is out of 0 <= TTL <= 255 range: " + ttl);
+
if (F.isEmpty(getRegisteredAddresses()))
U.warn(log, "TcpDiscoveryMulticastIpFinder has no pre-configured addresses " +
"(it is recommended in production to specify at least one address in " +
@@ -453,6 +485,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
sock.setSoTimeout(resWaitTime);
+ if (ttl != null)
+ sock.setTimeToLive(ttl);
+
reqPckt.setData(MSG_ADDR_REQ_DATA);
try {
@@ -722,6 +757,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
sock.joinGroup(mcastGrp);
+ if (ttl != null)
+ sock.setTimeToLive(ttl);
+
return sock;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml b/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
index 91d77cd..3e3d6e0 100644
--- a/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
+++ b/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml
@@ -180,6 +180,14 @@
<property name="javaName" value="name"/>
<property name="javaType" value="java.lang.String"/>
</bean>
+ <bean class="org.apache.ignite.cache.CacheTypeFieldMetadata">
+ <property name="databaseName" value="salary"/>
+ <property name="databaseType">
+ <util:constant static-field="java.sql.Types.INTEGER"/>
+ </property>
+ <property name="javaName" value="salary"/>
+ <property name="javaType" value="java.lang.Integer"/>
+ </bean>
</list>
</property>
</bean>
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
index 182d3bc..68a77dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
@@ -183,12 +183,24 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
// No-op.
}
- stmt.executeUpdate("CREATE TABLE IF NOT EXISTS String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))");
- stmt.executeUpdate("CREATE TABLE IF NOT EXISTS UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))");
- stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))");
- stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))");
- stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))");
- stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person_Complex (id integer not null, org_id integer not null, city_id integer not null, name varchar(50), PRIMARY KEY(id))");
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+ "String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))");
+
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+ "UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))");
+
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+ "Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))");
+
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+ "Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))");
+
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+ "Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))");
+
+ stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+ "Person_Complex (id integer not null, org_id integer not null, city_id integer not null, " +
+ "name varchar(50), salary integer, PRIMARY KEY(id))");
conn.commit();
@@ -238,7 +250,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
U.closeQuiet(prnStmt);
- PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name) VALUES (?, ?, ?, ?)");
+ PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)");
for (int i = 0; i < PERSON_CNT; i++) {
prnComplexStmt.setInt(1, i);
@@ -246,6 +258,11 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
prnComplexStmt.setInt(3, i % 100);
prnComplexStmt.setString(4, "name" + i);
+ if (i > 0)
+ prnComplexStmt.setInt(5, 1000 + i * 500);
+ else // Add person with null salary
+ prnComplexStmt.setNull(5, java.sql.Types.INTEGER);
+
prnComplexStmt.addBatch();
}
@@ -274,7 +291,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
assert key.getId() == val.getId();
assert key.getOrgId() == val.getOrgId();
- assert ("name" + key.getId()).equals(val.getName());
+ assertEquals("name" + key.getId(), val.getName());
prnComplexKeys.add((PersonComplexKey)k);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
index eac7669..9483545 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
@@ -190,7 +190,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
if (rnd.nextBoolean())
cache.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
else
- cache.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
+ cache.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, 1));
}
return null;
@@ -209,7 +209,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
if (rnd.nextBoolean())
cache.putIfAbsent(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
else
- cache.putIfAbsent(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
+ cache.putIfAbsent(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, i));
}
return null;
@@ -248,7 +248,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
if (rnd.nextBoolean())
map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
else
- map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
+ map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, 1));
}
IgniteCache<Object, Object> cache = jcache();
@@ -273,17 +273,17 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
IgniteCache<PersonKey, Person> cache = jcache();
try (Transaction tx = grid().transactions().txStart()) {
- cache.put(new PersonKey(1), new Person(1, rnd.nextInt(), "Name" + 1));
- cache.put(new PersonKey(2), new Person(2, rnd.nextInt(), "Name" + 2));
- cache.put(new PersonKey(3), new Person(3, rnd.nextInt(), "Name" + 3));
+ cache.put(new PersonKey(1), new Person(1, rnd.nextInt(), "Name" + 1, 1));
+ cache.put(new PersonKey(2), new Person(2, rnd.nextInt(), "Name" + 2, 2));
+ cache.put(new PersonKey(3), new Person(3, rnd.nextInt(), "Name" + 3, 3));
cache.get(new PersonKey(1));
cache.get(new PersonKey(4));
Map<PersonKey, Person> map = U.newHashMap(2);
- map.put(new PersonKey(5), new Person(5, rnd.nextInt(), "Name" + 5));
- map.put(new PersonKey(6), new Person(6, rnd.nextInt(), "Name" + 6));
+ map.put(new PersonKey(5), new Person(5, rnd.nextInt(), "Name" + 5, 5));
+ map.put(new PersonKey(6), new Person(6, rnd.nextInt(), "Name" + 6, 6));
cache.putAll(map);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
index 1c4b9a7..95c83b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java
@@ -37,6 +37,9 @@ public class Person implements Serializable {
/** Value for name. */
private String name;
+ /** Value for salary. */
+ private Integer salary;
+
/**
* Empty constructor.
*/
@@ -50,11 +53,13 @@ public class Person implements Serializable {
public Person(
Integer id,
Integer orgId,
- String name
+ String name,
+ Integer salary
) {
this.id = id;
this.orgId = orgId;
this.name = name;
+ this.salary = salary;
}
/**
@@ -111,6 +116,25 @@ public class Person implements Serializable {
this.name = name;
}
+
+ /**
+ * Gets salary.
+ *
+ * @return Value for salary.
+ */
+ public Integer getSalary() {
+ return salary;
+ }
+
+ /**
+ * Sets salary.
+ *
+ * @param salary New value for salary.
+ */
+ public void setSalary(Integer salary) {
+ this.salary = salary;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
new file mode 100644
index 0000000..e5e6d72
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.fair.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE1 = "cache1";
+
+ /** */
+ private static final String CACHE2 = "cache2";
+
+ /** */
+ private static final int GRID_CNT = 5;
+
+ /** */
+ private static final int KEY_RANGE = 1000;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ if (gridName.equals(getTestGridName(GRID_CNT - 1)))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(GRID_CNT - 1);
+
+ startGrid(GRID_CNT - 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxOperations() throws Exception {
+ txOperations(PARTITIONED, FULL_SYNC, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTxOperations() throws Exception {
+ txOperations(PARTITIONED, FULL_SYNC, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTxOperationsPrimarySync() throws Exception {
+ txOperations(PARTITIONED, PRIMARY_SYNC, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testCrossCacheTxOperationsFairAffinity() throws Exception {
+ txOperations(PARTITIONED, FULL_SYNC, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTxOperationsReplicated() throws Exception {
+ txOperations(REPLICATED, FULL_SYNC, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTxOperationsReplicatedPrimarySync() throws Exception {
+ txOperations(REPLICATED, PRIMARY_SYNC, true, false);
+ }
+
+ /**
+ * @param name Cache name.
+ * @param cacheMode Cache mode.
+ * @param writeSync Write synchronization mode.
+ * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name,
+ CacheMode cacheMode,
+ CacheWriteSynchronizationMode writeSync,
+ boolean fairAff) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(writeSync);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(1);
+
+ ccfg.setAffinity(fairAff ? new FairAffinityFunction() : new RendezvousAffinityFunction());
+
+ return ccfg;
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param writeSync Write synchronization mode.
+ * @param crossCacheTx If {@code true} uses cross cache transaction.
+ * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}.
+ * @throws Exception If failed.
+ */
+ private void txOperations(CacheMode cacheMode,
+ CacheWriteSynchronizationMode writeSync,
+ boolean crossCacheTx,
+ boolean fairAff) throws Exception {
+ Ignite ignite = ignite(0);
+
+ try {
+ ignite.createCache(cacheConfiguration(CACHE1, cacheMode, writeSync, fairAff));
+ ignite.createCache(cacheConfiguration(CACHE2, cacheMode, writeSync, fairAff));
+
+ txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, false);
+ txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, true);
+
+ txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, false);
+ txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, true);
+ }
+ finally {
+ ignite.destroyCache(CACHE1);
+ ignite.destroyCache(CACHE2);
+ }
+ }
+
+ /**
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @param crossCacheTx If {@code true} uses cross cache transaction.
+ * @param client If {@code true} uses client node.
+ * @throws Exception If failed.
+ */
+ private void txOperations(TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ boolean crossCacheTx,
+ boolean client) throws Exception {
+ final Map<TestKey, TestValue> expData1 = new HashMap<>();
+ final Map<TestKey, TestValue> expData2 = new HashMap<>();
+
+ Ignite ignite = client ? ignite(GRID_CNT - 1) : ignite(0);
+
+ assertEquals(client, (boolean)ignite.configuration().isClientMode());
+
+ final List<IgniteCache<TestKey, TestValue>> caches1 = new ArrayList<>();
+ final List<IgniteCache<TestKey, TestValue>> caches2 = new ArrayList<>();
+
+ for (int i = 0; i < GRID_CNT; i++) {
+ caches1.add(ignite(i).<TestKey, TestValue>cache(CACHE1));
+ caches2.add(ignite(i).<TestKey, TestValue>cache(CACHE2));
+ }
+
+ IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1);
+ IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2);
+
+ assertNotNull(cache1);
+ assertNotNull(cache2);
+ assertNotSame(cache1, cache2);
+
+ try {
+ Random rnd = new Random();
+
+ long seed = System.currentTimeMillis();
+
+ rnd.setSeed(seed);
+
+ log.info("Test tx operations [concurrency=" + concurrency +
+ ", isolation=" + isolation +
+ ", client=" + client +
+ ", seed=" + seed + ']');
+
+ IgniteTransactions txs = ignite.transactions();
+
+ final List<TestKey> keys = new ArrayList<>();
+
+ for (int i = 0; i < KEY_RANGE; i++)
+ keys.add(new TestKey(i));
+
+ CacheConfiguration ccfg = cache1.getConfiguration(CacheConfiguration.class);
+
+ boolean fullSync = ccfg.getWriteSynchronizationMode() == FULL_SYNC;
+ boolean optimistic = concurrency == OPTIMISTIC;
+
+ boolean checkData = fullSync && !optimistic;
+
+ for (int i = 0; i < 10_000; i++) {
+ if (i % 100 == 0)
+ log.info("Iteration: " + i);
+
+ boolean rollback = i % 10 == 0;
+
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ cacheOperation(expData1, rnd, cache1, checkData, rollback);
+
+ if (crossCacheTx)
+ cacheOperation(expData2, rnd, cache2, checkData, rollback);
+
+ if (rollback)
+ tx.rollback();
+ else
+ tx.commit();
+ }
+ }
+
+ if (fullSync) {
+ checkData(caches1, keys, expData1);
+ checkData(caches2, keys, expData2);
+
+ cache1.removeAll();
+ cache2.removeAll();
+
+ checkData(caches1, keys, new HashMap<TestKey, TestValue>());
+ checkData(caches2, keys, new HashMap<TestKey, TestValue>());
+ }
+ }
+ finally {
+ cache1.removeAll();
+ cache2.removeAll();
+ }
+ }
+
+ /**
+ * @param caches Caches.
+ * @param keys Keys.
+ * @param expData Expected data.
+ */
+ private void checkData(List<IgniteCache<TestKey, TestValue>> caches,
+ List<TestKey> keys, Map<TestKey, TestValue> expData) {
+ for (IgniteCache<TestKey, TestValue> cache : caches) {
+ for (TestKey key : keys) {
+ TestValue val = cache.get(key);
+ TestValue expVal = expData.get(key);
+
+ assertEquals(expVal, val);
+ }
+ }
+ }
+
+ /**
+ * @param expData Expected cache data.
+ * @param rnd Random.
+ * @param cache Cache.
+ * @param checkData If {@code true} checks data.
+ * @param willRollback {@code True} if will rollback transaction.
+ */
+ private void cacheOperation(
+ Map<TestKey, TestValue> expData,
+ Random rnd,
+ IgniteCache<TestKey, TestValue> cache,
+ boolean checkData,
+ boolean willRollback) {
+ TestKey key = key(rnd);
+ TestValue val = new TestValue(rnd.nextLong());
+
+ switch (rnd.nextInt(8)) {
+ case 0: {
+ cache.put(key, val);
+
+ if (!willRollback)
+ expData.put(key, val);
+
+ break;
+ }
+
+ case 1: {
+ TestValue oldVal = cache.getAndPut(key, val);
+
+ TestValue expOld = expData.get(key);
+
+ if (checkData)
+ assertEquals(expOld, oldVal);
+
+ if (!willRollback)
+ expData.put(key, val);
+
+ break;
+ }
+
+ case 2: {
+ boolean rmv = cache.remove(key);
+
+ if (checkData)
+ assertEquals(expData.containsKey(key), rmv);
+
+ if (!willRollback)
+ expData.remove(key);
+
+ break;
+ }
+
+ case 3: {
+ TestValue oldVal = cache.getAndRemove(key);
+
+ TestValue expOld = expData.get(key);
+
+ if (checkData)
+ assertEquals(expOld, oldVal);
+
+ if (!willRollback)
+ expData.remove(key);
+
+ break;
+ }
+
+ case 4: {
+ boolean put = cache.putIfAbsent(key, val);
+
+ boolean expPut = !expData.containsKey(key);
+
+ if (checkData)
+ assertEquals(expPut, put);
+
+ if (expPut && !willRollback)
+ expData.put(key, val);
+
+ break;
+ }
+
+ case 5: {
+ TestValue oldVal = cache.invoke(key, new TestEntryProcessor(val.value()));
+ TestValue expOld = expData.get(key);
+
+ if (checkData)
+ assertEquals(expOld, oldVal);
+
+ if (!willRollback)
+ expData.put(key, val);
+
+ break;
+ }
+
+ case 6: {
+ TestValue oldVal = cache.invoke(key, new TestEntryProcessor(null));
+ TestValue expOld = expData.get(key);
+
+ if (checkData)
+ assertEquals(expOld, oldVal);
+
+ break;
+ }
+
+ case 7: {
+ TestValue oldVal = cache.get(key);
+ TestValue expOld = expData.get(key);
+
+ if (checkData)
+ assertEquals(expOld, oldVal);
+
+ break;
+ }
+
+ default:
+ assert false;
+ }
+ }
+
+ /**
+ * @param rnd Random.
+ * @return Key.
+ */
+ private TestKey key(Random rnd) {
+ return new TestKey(rnd.nextInt(KEY_RANGE));
+ }
+
+ /**
+ *
+ */
+ private static class TestKey implements Serializable {
+ /** */
+ private long key;
+
+ /**
+ * @param key Key.
+ */
+ public TestKey(long key) {
+ this.key = key;
+ }
+
+ /**
+ * @return Key.
+ */
+ public long key() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey other = (TestKey)o;
+
+ return key == other.key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(key ^ (key >>> 32));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestKey.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private long val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(long val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public long value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestValue other = (TestValue)o;
+
+ return val == other.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> {
+ /** */
+ private Long val;
+
+ /**
+ * @param val Value.
+ */
+ public TestEntryProcessor(@Nullable Long val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) {
+ TestValue old = e.getValue();
+
+ if (val != null)
+ e.setValue(new TestValue(val));
+
+ return old;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
new file mode 100644
index 0000000..af87a7d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class IgniteAtomicCacheEntryProcessorNodeJoinTest extends IgniteCacheEntryProcessorNodeJoinTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}