You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/30 14:42:21 UTC
[5/7] ignite git commit: ignite-4285
ignite-4285
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ead6d9c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ead6d9c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ead6d9c
Branch: refs/heads/ignite-4285
Commit: 5ead6d9c5b51dfb645fe2bcb19a81e042a511835
Parents: ed32c83
Author: sboikov <sb...@gridgain.com>
Authored: Wed Nov 30 15:03:45 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Nov 30 16:55:12 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 99 ++++++-
.../distributed/GridDistributedCacheEntry.java | 82 +-----
.../cache/local/GridLocalCacheEntry.java | 105 ++-----
.../CacheSerializableTransactionsTest.java | 288 ++++++++++++++++++-
.../loadtests/hashmap/GridHashMapLoadTest.java | 5 +
5 files changed, 390 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ead6d9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0c30174..31baeda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -79,9 +79,11 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
@@ -4316,7 +4318,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer)
+ @Override public final GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer)
throws IgniteCheckedException {
assert Thread.holdsLock(this);
assert cctx.isSwapOrOffheapEnabled();
@@ -4383,7 +4385,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @param filter Entry filter.
* @return {@code True} if entry is visitable.
*/
- public boolean visitable(CacheEntryPredicate[] filter) {
+ public final boolean visitable(CacheEntryPredicate[] filter) {
boolean rmv = false;
try {
@@ -4438,7 +4440,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public boolean deleted() {
+ @Override public final boolean deleted() {
if (!cctx.deferredDelete())
return false;
@@ -4448,7 +4450,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public synchronized boolean obsoleteOrDeleted() {
+ @Override public final synchronized boolean obsoleteOrDeleted() {
return obsoleteVersionExtras() != null ||
(cctx.deferredDelete() && (deletedUnlocked() || !hasValueUnlocked()));
}
@@ -4457,7 +4459,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @return {@code True} if deleted.
*/
@SuppressWarnings("SimplifiableIfStatement")
- protected boolean deletedUnlocked() {
+ protected final boolean deletedUnlocked() {
assert Thread.holdsLock(this);
if (!cctx.deferredDelete())
@@ -4469,7 +4471,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param deleted {@code True} if deleted.
*/
- protected void deletedUnlocked(boolean deleted) {
+ protected final void deletedUnlocked(boolean deleted) {
assert Thread.holdsLock(this);
assert cctx.deferredDelete();
@@ -4506,7 +4508,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @return MVCC.
*/
- @Nullable protected GridCacheMvcc mvccExtras() {
+ @Nullable protected final GridCacheMvcc mvccExtras() {
return extras != null ? extras.mvcc() : null;
}
@@ -4514,7 +4516,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @return All MVCC local and non near candidates.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- @Nullable public synchronized List<GridCacheMvccCandidate> mvccAllLocal() {
+ @Nullable public final synchronized List<GridCacheMvccCandidate> mvccAllLocal() {
GridCacheMvcc mvcc = extras != null ? extras.mvcc() : null;
if (mvcc == null)
@@ -4540,21 +4542,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* @param mvcc MVCC.
*/
- protected void mvccExtras(@Nullable GridCacheMvcc mvcc) {
+ protected final void mvccExtras(@Nullable GridCacheMvcc mvcc) {
extras = (extras != null) ? extras.mvcc(mvcc) : mvcc != null ? new GridCacheMvccEntryExtras(mvcc) : null;
}
/**
* @return Obsolete version.
*/
- @Nullable protected GridCacheVersion obsoleteVersionExtras() {
+ @Nullable protected final GridCacheVersion obsoleteVersionExtras() {
return extras != null ? extras.obsoleteVersion() : null;
}
/**
* @param obsoleteVer Obsolete version.
+ * @param ext Extras.
*/
- protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer, GridCacheObsoleteEntryExtras ext) {
+ private void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer, GridCacheObsoleteEntryExtras ext) {
extras = (extras != null) ?
extras.obsoleteVersion(obsoleteVer) :
obsoleteVer != null ?
@@ -4563,6 +4566,80 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/**
+ * @param prevOwners Previous owners.
+ * @param owners Current owners.
+ * @param val Entry value.
+ */
+ protected final void checkOwnerChanged(@Nullable CacheLockCandidates prevOwners,
+ @Nullable CacheLockCandidates owners,
+ CacheObject val) {
+ assert !Thread.holdsLock(this);
+
+ if (prevOwners != null && owners == null) {
+ cctx.mvcc().callback().onOwnerChanged(this, null);
+
+ if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED)) {
+ boolean hasVal = hasValue();
+
+ GridCacheMvccCandidate cand = prevOwners.candidate(0);
+
+ cctx.events().addEvent(partition(),
+ key,
+ cand.nodeId(),
+ cand,
+ EVT_CACHE_OBJECT_UNLOCKED,
+ val,
+ hasVal,
+ val,
+ hasVal,
+ null,
+ null,
+ null,
+ true);
+ }
+ }
+
+ if (owners != null) {
+ for (int i = 0; i < owners.size(); i++) {
+ GridCacheMvccCandidate owner = owners.candidate(i);
+
+ boolean locked = prevOwners == null || !prevOwners.hasCandidate(owner.version());
+
+ if (locked) {
+ cctx.mvcc().callback().onOwnerChanged(this, owner);
+
+ if (owner.local())
+ checkThreadChain(owner);
+
+ if (cctx.events().isRecordable(EVT_CACHE_OBJECT_LOCKED)) {
+ boolean hasVal = hasValue();
+
+ // Event notification.
+ cctx.events().addEvent(partition(),
+ key,
+ owner.nodeId(),
+ owner,
+ EVT_CACHE_OBJECT_LOCKED,
+ val,
+ hasVal,
+ val,
+ hasVal,
+ null,
+ null,
+ null,
+ true);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @param owner Starting candidate in the chain.
+ */
+ protected abstract void checkThreadChain(GridCacheMvccCandidate owner);
+
+ /**
* Updates metrics.
*
* @param op Operation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ead6d9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index 2a11c01..6f8d0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -688,86 +688,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
}
}
- /**
- * @param prevOwners Previous owners.
- * @param owners Current owners.
- * @param val Entry value.
- */
- protected final void checkOwnerChanged(@Nullable CacheLockCandidates prevOwners,
- @Nullable CacheLockCandidates owners,
- CacheObject val) {
- assert !Thread.holdsLock(this);
-
- if (prevOwners != null) {
- for (int i = 0; i < prevOwners.size(); i++) {
- GridCacheMvccCandidate cand = prevOwners.candidate(i);
-
- boolean unlocked = owners == null || !owners.hasCandidate(cand.version());
-
- if (unlocked) {
- cctx.mvcc().callback().onOwnerChanged(this, null);
-
- if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED)) {
- boolean hasVal = hasValue();
-
- cctx.events().addEvent(partition(),
- key,
- cand.nodeId(),
- cand,
- EVT_CACHE_OBJECT_UNLOCKED,
- val,
- hasVal,
- val, hasVal,
- null,
- null,
- null,
- true);
- }
-
- break;
- }
- }
- }
-
- if (owners != null) {
- for (int i = 0; i < owners.size(); i++) {
- GridCacheMvccCandidate owner = owners.candidate(i);
-
- boolean locked = prevOwners == null || !prevOwners.hasCandidate(owner.version());
-
- if (locked) {
- cctx.mvcc().callback().onOwnerChanged(this, owner);
-
- if (owner.local())
- checkThreadChain(owner);
-
- if (cctx.events().isRecordable(EVT_CACHE_OBJECT_LOCKED)) {
- boolean hasVal = hasValue();
-
- // Event notification.
- cctx.events().addEvent(partition(),
- key,
- owner.nodeId(),
- owner,
- EVT_CACHE_OBJECT_LOCKED,
- val,
- hasVal,
- val,
- hasVal,
- null,
- null,
- null,
- true);
- }
- }
- }
- }
- }
-
- /**
- * @param owner Starting candidate in the chain.
- */
- protected void checkThreadChain(GridCacheMvccCandidate owner) {
+ /** {@inheritDoc} */
+ @Override final protected void checkThreadChain(GridCacheMvccCandidate owner) {
assert !Thread.holdsLock(this);
assert owner != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ead6d9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index 9d45084..b298819 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -30,9 +30,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED;
-import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
-
/**
* Cache entry for local caches.
*/
@@ -89,6 +86,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
) throws GridCacheEntryRemovedException {
assert serReadVer == null || serOrder != null;
+ CacheObject val;
GridCacheMvccCandidate cand;
CacheLockCandidates prev;
CacheLockCandidates owner;
@@ -130,12 +128,14 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
if (mvcc.isEmpty())
mvccExtras(null);
+
+ val = this.val;
}
if (cand != null && !cand.reentry())
cctx.mvcc().addNext(cctx, cand);
- checkOwnerChanged(prev, owner);
+ checkOwnerChanged(prev, owner, val);
return cand;
}
@@ -144,6 +144,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
* @param cand Candidate.
*/
void readyLocal(GridCacheMvccCandidate cand) {
+ CacheObject val;
CacheLockCandidates prev = null;
CacheLockCandidates owner = null;
@@ -158,9 +159,11 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
if (mvcc.isEmpty())
mvccExtras(null);
}
+
+ val = this.val;
}
- checkOwnerChanged(prev, owner);
+ checkOwnerChanged(prev, owner, val);
}
/** {@inheritDoc} */
@@ -195,6 +198,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
* Rechecks if lock should be reassigned.
*/
public void recheck() {
+ CacheObject val;
CacheLockCandidates prev = null;
CacheLockCandidates owner = null;
@@ -209,88 +213,15 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
if (mvcc.isEmpty())
mvccExtras(null);
}
- }
-
- checkOwnerChanged(prev, owner);
- }
-
- /**
- * @param prevOwners Previous owners.
- * @param owners Current owners.
- */
- private void checkOwnerChanged(@Nullable CacheLockCandidates prevOwners, @Nullable CacheLockCandidates owners) {
- assert !Thread.holdsLock(this);
-
- if (prevOwners != null) {
- for (int i = 0; i < prevOwners.size(); i++) {
- GridCacheMvccCandidate cand = prevOwners.candidate(i);
-
- boolean unlocked = owners == null || !owners.hasCandidate(cand.version());
-
- if (unlocked) {
- cctx.mvcc().callback().onOwnerChanged(this, null);
-
- if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED)) {
- boolean hasVal = hasValue();
-
- cctx.events().addEvent(partition(),
- key,
- cand.nodeId(),
- cand,
- EVT_CACHE_OBJECT_UNLOCKED,
- val,
- hasVal,
- val,
- hasVal,
- null,
- null,
- null,
- true);
- }
- break;
- }
- }
+ val = this.val;
}
- if (owners != null) {
- for (int i = 0; i < owners.size(); i++) {
- GridCacheMvccCandidate owner = owners.candidate(i);
-
- boolean locked = prevOwners == null || !prevOwners.hasCandidate(owner.version());
-
- if (locked) {
- cctx.mvcc().callback().onOwnerChanged(this, owner);
-
- checkThreadChain(owner);
-
- if (cctx.events().isRecordable(EVT_CACHE_OBJECT_LOCKED)) {
- boolean hasVal = hasValue();
-
- // Event notification.
- cctx.events().addEvent(partition(),
- key,
- owner.nodeId(),
- owner,
- EVT_CACHE_OBJECT_LOCKED,
- val,
- hasVal,
- val,
- hasVal,
- null,
- null,
- null,
- true);
- }
- }
- }
- }
+ checkOwnerChanged(prev, owner, val);
}
- /**
- * @param owner Starting candidate in the chain.
- */
- private void checkThreadChain(GridCacheMvccCandidate owner) {
+ /** {@inheritDoc} */
+ @Override protected void checkThreadChain(GridCacheMvccCandidate owner) {
assert !Thread.holdsLock(this);
assert owner != null;
@@ -340,6 +271,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
* @param threadId Thread ID.
*/
private void releaseLocal(long threadId) {
+ CacheObject val;
CacheLockCandidates prev = null;
CacheLockCandidates owner = null;
@@ -356,6 +288,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
else
owner = mvcc.allOwners();
}
+
+ val = this.val;
}
if (prev != null) {
@@ -366,11 +300,12 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
}
}
- checkOwnerChanged(prev, owner);
+ checkOwnerChanged(prev, owner, val);
}
/** {@inheritDoc} */
@Override public boolean removeLock(GridCacheVersion ver) throws GridCacheEntryRemovedException {
+ CacheObject val;
CacheLockCandidates prev = null;
CacheLockCandidates owner = null;
@@ -396,12 +331,14 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
else
owner = mvcc.allOwners();
}
+
+ val = this.val;
}
if (doomed != null)
checkThreadChain(doomed);
- checkOwnerChanged(prev, owner);
+ checkOwnerChanged(prev, owner, val);
return doomed != null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ead6d9c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 4986c8a..91e069a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -925,6 +925,71 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testTxConflictReadWrite3() throws Exception {
+ Ignite ignite0 = ignite(0);
+
+ final IgniteTransactions txs = ignite0.transactions();
+
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ logCacheInfo(ccfg);
+
+ IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+ List<Integer> readKeys = new ArrayList<>();
+ List<Integer> writeKeys = new ArrayList<>();
+
+ readKeys.add(primaryKey(cache));
+ writeKeys.add(primaryKeys(cache, 1, 1000_0000).get(0));
+
+ if (ccfg.getBackups() > 0) {
+ readKeys.add(backupKey(cache));
+ writeKeys.add(backupKeys(cache, 1, 1000_0000).get(0));
+ }
+
+ if (ccfg.getCacheMode() == PARTITIONED) {
+ readKeys.add(nearKey(cache));
+ writeKeys.add(nearKeys(cache, 1, 1000_0000).get(0));
+ }
+
+ try {
+ for (Integer readKey : readKeys) {
+ for (Integer writeKey : writeKeys) {
+ try {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.get(readKey);
+
+ cache.put(writeKey, writeKey);
+
+ updateKey(cache, readKey, 0);
+
+ tx.commit();
+ }
+
+ fail();
+ }
+ catch (TransactionOptimisticException e) {
+ // Expected exception.
+ }
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ cache.get(readKey);
+
+ cache.put(writeKey, writeKey);
+
+ tx.commit();
+ }
+ }
+ }
+ }
+ finally {
+ destroyCache(ccfg.getName());
+ }
+ }
+ }
+
+ /**
* @throws Exception If failed
*/
public void testTxConflictGetAndPut1() throws Exception {
@@ -2471,53 +2536,70 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
ignite0.createCache(ccfg);
+ CacheConfiguration<Integer, Integer> readCacheCcfg = new CacheConfiguration<>(ccfg);
+
+ readCacheCcfg.setName(ccfg.getName() + "-read");
+
+ ignite0.createCache(readCacheCcfg);
+
try {
- checkNoReadLockConflict(ignite(0), ccfg.getName(), entry, putKey);
+ checkNoReadLockConflict(ignite(0), ccfg.getName(), ccfg.getName(), entry, putKey);
+
+ checkNoReadLockConflict(ignite(1), ccfg.getName(), ccfg.getName(), entry, putKey);
+
+ checkNoReadLockConflict(ignite(SRVS), ccfg.getName(), ccfg.getName(), entry, putKey);
+
+ checkNoReadLockConflict(ignite(0), readCacheCcfg.getName(), ccfg.getName(), entry, putKey);
- checkNoReadLockConflict(ignite(1), ccfg.getName(), entry, putKey);
+ checkNoReadLockConflict(ignite(1), readCacheCcfg.getName(), ccfg.getName(), entry, putKey);
- checkNoReadLockConflict(ignite(SRVS), ccfg.getName(), entry, putKey);
+ checkNoReadLockConflict(ignite(SRVS), readCacheCcfg.getName(), ccfg.getName(), entry, putKey);
}
finally {
destroyCache(ccfg.getName());
+
+ destroyCache(readCacheCcfg.getName());
}
}
}
/**
* @param ignite Node.
- * @param cacheName Cache name.
+ * @param readCacheName Cache name for get.
+ * @param writeCacheName Cache name for put.
* @param entry If {@code true} then uses 'getEntry' to read value, otherwise uses 'get'.
* @param putKey Write key counter.
* @throws Exception If failed.
*/
private void checkNoReadLockConflict(final Ignite ignite,
- String cacheName,
+ String readCacheName,
+ String writeCacheName,
final boolean entry,
final AtomicInteger putKey) throws Exception
{
final int THREADS = 64;
- final IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+ final IgniteCache<Integer, Integer> readCache = ignite.cache(readCacheName);
+ final IgniteCache<Integer, Integer> writeCache = ignite.cache(writeCacheName);
- List<Integer> readKeys = testKeys(cache);
+ List<Integer> readKeys = testKeys(readCache);
for (final Integer readKey : readKeys) {
final CyclicBarrier barrier = new CyclicBarrier(THREADS);
- cache.put(readKey, Integer.MIN_VALUE);
+ readCache.put(readKey, Integer.MIN_VALUE);
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
if (entry)
- cache.get(readKey);
+ readCache.get(readKey);
else
- cache.getEntry(readKey);
+ readCache.getEntry(readKey);
barrier.await();
- cache.put(putKey.incrementAndGet(), 0);
+ writeCache.put(putKey.incrementAndGet(), 0);
tx.commit();
}
@@ -2526,11 +2608,11 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
}
}, THREADS, "test-thread");
- assertEquals((Integer)Integer.MIN_VALUE, cache.get(readKey));
+ assertEquals((Integer)Integer.MIN_VALUE, readCache.get(readKey));
- cache.put(readKey, readKey);
+ readCache.put(readKey, readKey);
- assertEquals(readKey, cache.get(readKey));
+ assertEquals(readKey, readCache.get(readKey));
}
}
@@ -2774,6 +2856,184 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testReadWriteAccountTx() throws Exception {
+ final CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED,
+ FULL_SYNC,
+ 1,
+ false,
+ false);
+
+ ignite(0).createCache(ccfg);
+
+ try {
+ final int ACCOUNTS = 50;
+ final int VAL_PER_ACCOUNT = 1000;
+
+ IgniteCache<Integer, Account> cache0 = ignite(0).cache(ccfg.getName());
+
+ final Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < ACCOUNTS; i++) {
+ cache0.put(i, new Account(VAL_PER_ACCOUNT));
+
+ keys.add(i);
+ }
+
+ final List<Ignite> clients = clients();
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ IgniteInternalFuture<?> readFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ try {
+ int threadIdx = idx.getAndIncrement();
+
+ int nodeIdx = threadIdx % (SRVS + CLIENTS);
+
+ Ignite node = ignite(nodeIdx);
+
+ IgniteCache<Integer, Account> cache = node.cache(ccfg.getName());
+
+ IgniteTransactions txs = node.transactions();
+
+ Integer putKey = ACCOUNTS + threadIdx;
+
+ while (!stop.get()) {
+ int sum;
+
+ while (true) {
+ sum = 0;
+
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Map<Integer, Account> data = cache.getAll(keys);
+
+ for (int i = 0; i < ACCOUNTS; i++) {
+ Account account = data.get(i);
+
+ assertNotNull(account);
+
+ sum += account.value();
+ }
+
+ cache.put(putKey, new Account(sum));
+
+ tx.commit();
+ }
+ catch (TransactionOptimisticException e) {
+ continue;
+ }
+
+ break;
+ }
+
+ assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
+ }
+
+ return null;
+ }
+ catch (Throwable e) {
+ stop.set(true);
+
+ log.error("Unexpected error: " + e);
+
+ throw e;
+ }
+ }
+ }, (SRVS + CLIENTS) * 2, "update-thread");
+
+ IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ try {
+ int nodeIdx = idx.getAndIncrement() % clients.size();
+
+ Ignite node = clients.get(nodeIdx);
+
+ IgniteCache<Integer, Account> cache = node.cache(ccfg.getName());
+
+ IgniteTransactions txs = node.transactions();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get()) {
+ int id1 = rnd.nextInt(ACCOUNTS);
+
+ int id2 = rnd.nextInt(ACCOUNTS);
+
+ while (id2 == id1)
+ id2 = rnd.nextInt(ACCOUNTS);
+
+ while (true) {
+ try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+ Account a1 = cache.get(id1);
+ Account a2 = cache.get(id2);
+
+ assertNotNull(a1);
+ assertNotNull(a2);
+
+ if (a1.value() > 0) {
+ a1 = new Account(a1.value() - 1);
+ a2 = new Account(a2.value() + 1);
+ }
+
+ cache.put(id1, a1);
+ cache.put(id2, a2);
+
+ tx.commit();
+ }
+ catch (TransactionOptimisticException e) {
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ return null;
+ }
+ catch (Throwable e) {
+ stop.set(true);
+
+ log.error("Unexpected error: " + e);
+
+ throw e;
+ }
+ }
+ }, 2, "update-thread");
+
+ try {
+ U.sleep(15_000);
+ }
+ finally {
+ stop.set(true);
+ }
+
+ readFut.get();
+ updateFut.get();
+ int sum = 0;
+
+ for (int i = 0; i < ACCOUNTS; i++) {
+ Account a = cache0.get(i);
+
+ assertNotNull(a);
+ assertTrue(a.value() >= 0);
+
+ log.info("Account: " + a.value());
+
+ sum += a.value();
+ }
+
+ assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum);
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testNearCacheReaderUpdate() throws Exception {
Ignite ignite0 = ignite(0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ead6d9c/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
index 13f29fe..5c12f84 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -90,6 +91,10 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest {
return false;
}
+ @Override protected void checkThreadChain(GridCacheMvccCandidate owner) {
+ // No-op.
+ }
+
@Override public void txUnlock(IgniteInternalTx tx) {
// No-op.
}