You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/13 09:05:43 UTC
[10/21] ignite git commit: ignite-1607 WIP
ignite-1607 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/206721a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/206721a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/206721a0
Branch: refs/heads/ignite-1607
Commit: 206721a0954521f3c84ccb52ce32d40ba021c9ce
Parents: caed865
Author: sboikov <sb...@gridgain.com>
Authored: Thu Oct 8 16:30:53 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 9 13:10:01 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 199 ++--------
.../distributed/dht/GridDhtCacheAdapter.java | 6 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 40 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 2 +-
...arOptimisticSerializableTxPrepareFuture.java | 47 ++-
.../cache/distributed/near/GridNearTxLocal.java | 14 -
.../CacheSerializableTransactionsTest.java | 374 ++++++++++++++++++-
.../GridCacheConcurrentTxMultiNodeTest.java | 3 -
.../IgniteTxMultiThreadedAbstractTest.java | 2 +-
9 files changed, 465 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index ae987b7..bb15204 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1409,144 +1409,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param taskName Task name.
* @return Future.
*/
- public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> reloadAllAsync0(
+ public final IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> reloadAllAsync0(
Collection<KeyCacheObject> keys,
boolean ret,
boolean skipVals,
@Nullable UUID subjId,
- String taskName)
- {
- final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
- if (!F.isEmpty(keys)) {
- final Map<KeyCacheObject, GridCacheVersion> keyVers = new HashMap();
-
- for (KeyCacheObject key : keys) {
- if (key == null)
- continue;
-
- // Skip primary or backup entries for near cache.
- if (ctx.isNear() && ctx.affinity().localNode(key, topVer))
- continue;
-
- while (true) {
- try {
- GridCacheEntryEx entry = entryExSafe(key, topVer);
-
- if (entry == null)
- break;
-
- GridCacheVersion ver = entry.version();
-
- keyVers.put(key, ver);
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry for reload (will retry): " + key);
- }
- catch (GridDhtInvalidPartitionException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got invalid partition for key (will skip): " + key);
+ String taskName) {
+ assert false;
- break;
- }
- }
- }
-
- final Map<KeyCacheObject, CacheObject> map =
- ret ? U.<KeyCacheObject, CacheObject>newHashMap(keys.size()) : null;
-
- final Collection<KeyCacheObject> absentKeys = F.view(keyVers.keySet());
-
- final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>();
-
- IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null,
- subjId, taskName, new CI2<KeyCacheObject, Object>() {
- /** Version for all loaded entries. */
- private GridCacheVersion nextVer = ctx.versions().next();
-
- /** {@inheritDoc} */
- @Override public void apply(KeyCacheObject key, Object val) {
- loadedKeys.add(key);
-
- GridCacheEntryEx entry = peekEx(key);
-
- if (entry != null) {
- try {
- GridCacheVersion curVer = keyVers.get(key);
-
- if (curVer != null) {
- boolean wasNew = entry.isNewLocked();
-
- entry.unswap();
-
- CacheObject cacheVal = ctx.toCacheObject(val);
-
- boolean set = entry.versionedValue(cacheVal, curVer, nextVer);
-
- ctx.evicts().touch(entry, topVer);
-
- if (map != null) {
- if (set || wasNew)
- map.put(key, cacheVal);
- else {
- CacheObject v = entry.peek(true, false, false, null);
-
- if (v != null)
- map.put(key, v);
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Set value loaded from store into entry [set=" + set + ", " +
- "curVer=" +
- curVer + ", newVer=" + nextVer + ", entry=" + entry + ']');
- }
- }
- else {
- if (log.isDebugEnabled()) {
- log.debug("Current version was not found (either entry was removed or " +
- "validation was not passed: " + entry);
- }
- }
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled()) {
- log.debug("Got removed entry for reload (will not store reloaded entry) " +
- "[entry=" + entry + ']');
- }
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
- });
-
- return readFut.chain(new CX1<IgniteInternalFuture<Object>, Map<KeyCacheObject, CacheObject>>() {
- @Override public Map<KeyCacheObject, CacheObject> applyx(IgniteInternalFuture<Object> e)
- throws IgniteCheckedException {
- // Touch all not loaded keys.
- for (KeyCacheObject key : absentKeys) {
- if (!loadedKeys.contains(key)) {
- GridCacheEntryEx entry = peekEx(key);
-
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
- }
- }
-
- // Make sure there were no exceptions.
- e.get();
-
- return map;
- }
- });
- }
-
- return new GridFinishedFuture<>(Collections.<KeyCacheObject, CacheObject>emptyMap());
+ return new GridFinishedFuture<>();
}
/**
@@ -1763,7 +1634,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
expiry,
skipVals,
false,
- canRemap);
+ canRemap,
+ false);
}
/**
@@ -1778,7 +1650,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param keepCacheObjects Keep cache objects
* @return Future.
*/
- public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
+ public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
final boolean readThrough,
boolean checkTx,
@Nullable final UUID subjId,
@@ -1787,7 +1659,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Nullable IgniteCacheExpiryPolicy expiry,
final boolean skipVals,
final boolean keepCacheObjects,
- boolean canRemap
+ boolean canRemap,
+ final boolean needVer
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1822,20 +1695,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheEntryEx entry = entryEx(key);
try {
- CacheObject val = entry.innerGet(null,
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null,
ctx.isSwapOrOffheapEnabled(),
- /*don't read-through*/false,
- /*fail-fast*/true,
/*unmarshal*/true,
/*update-metrics*/!skipVals,
/*event*/!skipVals,
- /*temporary*/false,
subjId,
null,
taskName,
expiry);
- if (val == null) {
+ if (res == null) {
GridCacheVersion ver = entry.version();
if (misses == null)
@@ -1844,7 +1714,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
misses.put(key, ver);
}
else {
- ctx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, true);
+ if (needVer) {
+ assert keepCacheObjects;
+
+ map.put((K1)key, (V1)new T2<>(res.get1(), res.get2()));
+ }
+ else {
+ ctx.addResult(map,
+ key,
+ res.get1(),
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ true);
+ }
if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
ctx.evicts().touch(entry, topVer);
@@ -1860,15 +1743,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (log.isDebugEnabled())
log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key);
}
- catch (GridCacheFilterFailedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Filter validation failed for entry: " + entry);
-
- if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
- ctx.evicts().touch(entry, topVer);
-
- break; // While loop.
- }
}
}
@@ -1918,13 +1792,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
// Don't put key-value pair into result map if value is null.
if (val != null) {
- ctx.addResult(map,
- key,
- cacheVal,
- skipVals,
- keepCacheObjects,
- deserializePortable,
- false);
+ if (needVer) {
+ assert keepCacheObjects;
+
+ map.put((K1)key, (V1)new T2<>(cacheVal, set ? nextVer : ver));
+ }
+ else {
+ ctx.addResult(map,
+ key,
+ cacheVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
}
if (tx0 == null || (!tx0.implicit() &&
@@ -2017,6 +1898,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
}
else {
+ assert !needVer;
+
return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
@Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
return tx.getAllAsync(ctx, keys, null, deserializePortable, skipVals, false, !readThrough);
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 3ce9ee9..25e480c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CI3;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -605,7 +606,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param skipVals Skip values flag.
* @return Get future.
*/
- IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync(
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync(
Collection<KeyCacheObject> keys,
boolean readThrough,
@Nullable UUID subjId,
@@ -623,7 +624,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
expiry,
skipVals,
/*keep cache objects*/true,
- canRemap);
+ canRemap,
+ /*need version*/true);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index a67b1de..e8cafb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -17,11 +17,10 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,6 +44,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
@@ -147,6 +147,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
assert reader != null;
assert !F.isEmpty(keys);
+ assert !reload;
+
this.reader = reader;
this.cctx = cctx;
this.msgId = msgId;
@@ -291,8 +293,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
return new GridFinishedFuture<Collection<GridCacheEntryInfo>>(
Collections.<GridCacheEntryInfo>emptyList());
- final Collection<GridCacheEntryInfo> infos = new LinkedList<>();
-
String taskName0 = cctx.kernalContext().job().currentTaskName();
if (taskName0 == null)
@@ -335,8 +335,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
txFut.add(f);
}
- infos.add(info);
-
break;
}
catch (IgniteCheckedException err) {
@@ -355,7 +353,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
if (txFut != null)
txFut.markInitialized();
- IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut;
+ IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
if (txFut == null || txFut.isDone()) {
if (reload && cctx.readThrough() && cctx.store().configured()) {
@@ -393,8 +391,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
// transactions to complete.
fut = new GridEmbeddedFuture<>(
txFut,
- new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() {
- @Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) {
+ new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() {
+ @Override public IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> apply(Boolean b, Exception e) {
if (e != null)
throw new GridClosureException(e);
@@ -432,23 +430,29 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
}
return new GridEmbeddedFuture<>(
- new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() {
- @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) {
+ new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() {
+ @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) {
if (e != null) {
onDone(e);
return Collections.emptyList();
}
else {
- for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) {
- GridCacheEntryInfo info = it.next();
+ Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
- Object v = map.get(info.key());
+ for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
+ T2<CacheObject, GridCacheVersion> val = entry.getValue();
- if (v == null)
- it.remove();
- else
- info.value(skipVals ? null : (CacheObject)v);
+ assert val != null;
+
+ GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+ info.cacheId(cctx.cacheId());
+ info.key(entry.getKey());
+ info.value(skipVals ? null : val.get1());
+ info.version(val.get2());
+
+ infos.add(info);
}
return infos;
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 2071275..44f34aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -627,7 +627,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (finish(false) || state() == UNKNOWN)
fut.finish();
else
- fut.onError(new IgniteCheckedException("Failed to commit transaction: " +
+ fut.onError(new IgniteCheckedException("Failed to rollback transaction: " +
CU.txString(GridDhtTxLocal.this)));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index e48601d..04c4851 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -139,7 +140,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
- f.onResult(e);
+ f.onNodeLeft(e);
found = true;
}
@@ -165,7 +166,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
}
if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null)
- tx.onOptimisticException(nodeId);
+ tx.removeMapping(nodeId);
err.compareAndSet(null, e);
}
@@ -519,14 +520,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
Collection<MiniFuture> futs = (Collection)futures();
- for (MiniFuture fut : futs) {
- if (remap && fut.rcvRes.get())
+ Iterator<MiniFuture> it = futs.iterator();
+
+ while (it.hasNext()) {
+ MiniFuture fut = it.next();
+
+ if (skipFuture(remap, fut))
continue;
IgniteCheckedException err = prepare(fut);
if (err != null) {
- onDone(err);
+ while (it.hasNext()) {
+ fut = it.next();
+
+ if (skipFuture(remap, fut))
+ continue;
+
+ tx.removeMapping(fut.mapping().node().id());
+
+ fut.onResult(err);
+ }
break;
}
@@ -536,10 +550,19 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
}
/**
+ * @param remap Remap flag.
+ * @param fut Future.
+ * @return {@code True} if skip future during remap.
+ */
+ private boolean skipFuture(boolean remap, MiniFuture fut) {
+ return remap && fut.rcvRes.get();
+ }
+
+ /**
* @param fut Mini future.
- * @return {@code False} if should stop mapping.
+ * @return Prepare error if any.
*/
- private IgniteCheckedException prepare(final MiniFuture fut) {
+ @Nullable private IgniteCheckedException prepare(final MiniFuture fut) {
GridDistributedTxMapping m = fut.mapping();
final ClusterNode n = m.node();
@@ -575,7 +598,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
tx.userPrepare();
}
catch (IgniteCheckedException e) {
- onError(m.node().id(), e);
+ fut.onResult(e);
return e;
}
@@ -605,7 +628,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
catch (ClusterTopologyCheckedException e) {
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
- fut.onResult(e);
+ fut.onNodeLeft(e);
return e;
}
@@ -806,7 +829,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
*/
void onResult(Throwable e) {
if (rcvRes.compareAndSet(false, true)) {
- err.compareAndSet(null, e);
+ onError(m.node().id(), e);
if (log.isDebugEnabled())
log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
@@ -822,7 +845,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
/**
* @param e Node failure.
*/
- void onResult(ClusterTopologyCheckedException e) {
+ void onNodeLeft(ClusterTopologyCheckedException e) {
if (isDone())
return;
@@ -856,7 +879,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
assert cctx.kernalContext().clientNode();
assert m.clientFirst();
- tx.onClientRemap(m.node().id());
+ tx.removeMapping(m.node().id());
ClientRemapFuture remapFut = new ClientRemapFuture();
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 9207bd0..5b2d50c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -597,20 +597,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/**
- * @param nodeId Primary node id.
- */
- void onOptimisticException(UUID nodeId) {
- mappings.remove(nodeId);
- }
-
- /**
- * @param nodeId Primary node id.
- */
- void onClientRemap(UUID nodeId) {
- mappings.remove(nodeId);
- }
-
- /**
* @param nodeId Node ID to mark with explicit lock.
* @return {@code True} if mapping was found.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 9f74b9c..85c4f80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -77,7 +78,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final boolean FAST = true;
+ private static final boolean FAST = false;
/** */
private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>();
@@ -95,6 +96,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ cfg.setPeerClassLoadingEnabled(false);
+
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
cfg.setClientMode(client);
@@ -266,8 +269,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
txAsync(cache, OPTIMISTIC, SERIALIZABLE,
new IgniteClosure<IgniteCache<Integer, Integer>, Void>() {
- @Override
- public Void apply(IgniteCache<Integer, Integer> cache) {
+ @Override public Void apply(IgniteCache<Integer, Integer> cache) {
cache.get(key);
return null;
@@ -1305,6 +1307,329 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testRollbackNearCache1() throws Exception {
+ rollbackNearCacheWrite(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollbackNearCache2() throws Exception {
+ rollbackNearCacheWrite(false);
+ }
+
+ /**
+ * @param near If {@code true} locks entry using the same near cache.
+ * @throws Exception If failed.
+ */
+ private void rollbackNearCacheWrite(boolean near) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ IgniteCache<Integer, Integer> cache0 =
+ ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+ final String cacheName = cache0.getName();
+
+ try {
+ Ignite ignite = ignite(SRVS);
+
+ IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
+ new NearCacheConfiguration<Integer, Integer>());
+
+ IgniteTransactions txs = ignite.transactions();
+
+ Integer key1 = primaryKey(ignite(0).cache(cacheName));
+ Integer key2 = primaryKey(ignite(1).cache(cacheName));
+ Integer key3 = primaryKey(ignite(2).cache(cacheName));
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = null;
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, key1);
+ cache.put(key2, key2);
+ cache.put(key3, key3);
+
+ fut = lockKey(latch, near ? cache : cache0, key2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ latch.countDown();
+
+ assert fut != null;
+
+ fut.get();
+
+ checkValue(key1, null, cacheName);
+ checkValue(key2, 1, cacheName);
+ checkValue(key3, null, cacheName);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, key1);
+ cache.put(key2, key2);
+ cache.put(key3, key3);
+
+ tx.commit();
+ }
+
+ checkValue(key1, key1, cacheName);
+ checkValue(key2, key2, cacheName);
+ checkValue(key3, key3, cacheName);
+ }
+ finally {
+ ignite0.destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollbackNearCache3() throws Exception {
+ rollbackNearCacheRead(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRollbackNearCache4() throws Exception {
+ rollbackNearCacheRead(false);
+ }
+
+ /**
+ * @param near If {@code true} updates entry using the same near cache.
+ * @throws Exception If failed.
+ */
+ private void rollbackNearCacheRead(boolean near) throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ IgniteCache<Integer, Integer> cache0 =
+ ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+
+ final String cacheName = cache0.getName();
+
+ try {
+ Ignite ignite = ignite(SRVS);
+
+ IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName,
+ new NearCacheConfiguration<Integer, Integer>());
+
+ IgniteTransactions txs = ignite.transactions();
+
+ Integer key1 = primaryKey(ignite(0).cache(cacheName));
+ Integer key2 = primaryKey(ignite(1).cache(cacheName));
+ Integer key3 = primaryKey(ignite(2).cache(cacheName));
+
+ cache0.put(key1, -1);
+ cache0.put(key2, -1);
+ cache0.put(key3, -1);
+
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.get(key1);
+ cache.get(key2);
+ cache.get(key3);
+
+ updateKey(near ? cache : cache0, key2, -2);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ log.info("Expected exception: " + e);
+ }
+
+ checkValue(key1, -1, cacheName);
+ checkValue(key2, -2, cacheName);
+ checkValue(key3, -1, cacheName);
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.put(key1, key1);
+ cache.put(key2, key2);
+ cache.put(key3, key3);
+
+ tx.commit();
+ }
+
+ checkValue(key1, key1, cacheName);
+ checkValue(key2, key2, cacheName);
+ checkValue(key3, key3, cacheName);
+ }
+ finally {
+ ignite0.destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountTx1() throws Exception {
+ accountTx(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testAccountTxNearCache() throws Exception {
+ accountTx(false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAccountTx2() throws Exception {
+ accountTx(true, false);
+ }
+
+ /**
+ * @param getAll If {@code true} uses getAll/putAll in transaction.
+ * @param nearCache If {@code true} near cache is enabled.
+ * @throws Exception If failed.
+ */
+ private void accountTx(final boolean getAll, final boolean nearCache) throws Exception {
+ final Ignite ignite0 = ignite(0);
+
+ final String cacheName =
+ ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
+
+ try {
+ final List<Ignite> clients = clients();
+
+ final int ACCOUNTS = 100;
+ final int VAL_PER_ACCOUNT = 10_000;
+
+ IgniteCache<Integer, Account> cache = ignite0.cache(cacheName);
+
+ for (int i = 0; i < ACCOUNTS; i++)
+ cache.put(i, new Account(VAL_PER_ACCOUNT));
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ final int THREADS = 20;
+
+ final long stopTime = System.currentTimeMillis() + 10_000;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int nodeIdx = idx.getAndIncrement() % clients.size();
+
+ Ignite node = clients.get(nodeIdx);
+
+ log.info("Tx thread: " + node.name());
+
+ final IgniteTransactions txs = node.transactions();
+
+ final IgniteCache<Integer, Account> cache =
+ nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) :
+ node.<Integer, Account>cache(cacheName);
+
+ assertNotNull(cache);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (U.currentTimeMillis() < stopTime) {
+ int id1 = rnd.nextInt(ACCOUNTS);
+
+ int id2 = rnd.nextInt(ACCOUNTS);
+
+ while (id2 == id1)
+ id2 = rnd.nextInt(ACCOUNTS);
+
+ try {
+ while (true) {
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ if (getAll) {
+ Map<Integer, Account> map = cache.getAll(F.asSet(id1, id2));
+
+ Account a1 = cache.get(id1);
+ Account a2 = cache.get(id2);
+
+ assertNotNull(a1);
+ assertNotNull(a2);
+
+ if (a1.value() > 0) {
+ a1 = new Account(a1.value() - 1);
+ a2 = new Account(a2.value() + 1);
+ }
+
+ map.put(id1, a1);
+ map.put(id2, a2);
+
+ cache.putAll(map);
+ }
+ else {
+ Account a1 = cache.get(id1);
+ Account a2 = cache.get(id2);
+
+ assertNotNull(a1);
+ assertNotNull(a2);
+
+ if (a1.value() > 0) {
+ a1 = new Account(a1.value() - 1);
+ a2 = new Account(a2.value() + 1);
+ }
+
+ cache.put(id1, a1);
+ cache.put(id2, a2);
+ }
+
+ tx.commit();
+ }
+
+ break;
+ }
+ catch (TransactionOptimisticException ignore) {
+ // Retry.
+ }
+ }
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
+
+ throw e;
+ }
+ }
+
+ return null;
+ }
+ }, THREADS, "tx-thread");
+
+ fut.get(30_000);
+
+ int sum = 0;
+
+ for (int i = 0; i < ACCOUNTS; i++) {
+ Account a = cache.get(i);
+
+ assertNotNull(a);
+ assertTrue(a.value() >= 0);
+
+ log.info("Account: " + a.value());
+
+ sum += a.value();
+ }
+
+ assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
+ }
+ finally {
+ ignite0.destroyCache(cacheName);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testConcurrentUpdateNoDeadlock() throws Exception {
concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false);
}
@@ -1319,14 +1644,14 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testConcurrentUpdateNoDeadlockClients() throws Exception {
+ public void testConcurrentUpdateNoDeadlockFromClients() throws Exception {
concurrentUpdateNoDeadlock(clients(), 20, false);
}
/**
* @throws Exception If failed.
*/
- public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception {
+ public void testConcurrentUpdateNoDeadlockFromClientsNodeRestart() throws Exception {
concurrentUpdateNoDeadlock(clients(), 20, true);
}
@@ -1356,12 +1681,15 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes,
int threads,
final boolean restart) throws Exception {
+ if (FAST)
+ return;
+
assert updateNodes.size() > 0;
- final Ignite ignite0 = ignite(0);
+ final Ignite srv = ignite(1);
final String cacheName =
- ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
+ srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
try {
final int KEYS = 100;
@@ -1389,12 +1717,10 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
});
}
- int ITERS = FAST ? 1 : 10;
-
- for (int i = 0; i < ITERS; i++) {
+ for (int i = 0; i < 10; i++) {
log.info("Iteration: " + i);
- final long stopTime = U.currentTimeMillis() + (FAST ? 1000 : 10_000);
+ final long stopTime = U.currentTimeMillis() + 10_000;
final AtomicInteger idx = new AtomicInteger();
@@ -1454,7 +1780,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
updateFut.get(60, SECONDS);
- IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName);
+ IgniteCache<Integer, Integer> cache = srv.cache(cacheName);
for (int key = 0; key < KEYS; key++) {
Integer val = cache.get(key);
@@ -1474,7 +1800,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}
finally {
- destroyCache(ignite(1), cacheName);
+ destroyCache(srv, cacheName);
}
}
@@ -1735,4 +2061,26 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
return val;
}
}
+
+ /**
+ *
+ */
+ static class Account {
+ /** */
+ private final int val;
+
+ /**
+ * @param val Value.
+ */
+ public Account(int val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public int value() {
+ return val;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
index 67bc08c..1ef77f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
@@ -162,9 +162,6 @@ public class GridCacheConcurrentTxMultiNodeTest extends GridCommonAbstractTest {
c.setPeerClassLoadingEnabled(false);
- // Enable tracing.
-// Logger.getLogger("org.apache.ignite.kernal.processors.cache.GridCacheDgcManager.trace").setLevel(Level.DEBUG);
-
return c;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
index 7a1a0b9..191feb6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
@@ -225,7 +225,7 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract
final int ITERATIONS = 100;
- for (int key0 = 0; key0 < 20; key0++) {
+ for (int key0 = 100_000; key0 < 100_000 + 20; key0++) {
final int key = key0;
cache.put(key, 0L);