You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/15 14:22:10 UTC
[08/12] ignite git commit: ignite-971 Fixed offheap to swap eviction,
added failover tests with swap/offheap,
added retries for tx 'check backup' rollback.
ignite-971 Fixed offheap to swap eviction, added failover tests with swap/offheap, added retries for tx 'check backup' rollback.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a7490a6e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a7490a6e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a7490a6e
Branch: refs/heads/master
Commit: a7490a6e48d4c9f1e85a3ae08f97d6c5ced7a71b
Parents: eb7d2b0
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 15 12:46:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 15 12:46:48 2015 +0300
----------------------------------------------------------------------
.../processors/cache/CacheMetricsImpl.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 21 +-
.../processors/cache/GridCacheEntryEx.java | 13 ++
.../cache/GridCacheEvictionManager.java | 18 +-
.../processors/cache/GridCacheMapEntry.java | 52 ++++-
.../cache/GridCacheSwapEntryImpl.java | 24 +--
.../processors/cache/GridCacheSwapManager.java | 190 +++++++++++++++---
.../processors/cache/GridCacheUtils.java | 21 +-
.../distributed/dht/GridDhtLocalPartition.java | 7 +-
.../dht/GridDhtPartitionTopologyImpl.java | 9 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 3 +
.../near/GridNearTxFinishFuture.java | 32 ++-
.../cache/transactions/IgniteTxHandler.java | 9 +-
.../offheap/GridOffHeapProcessor.java | 25 ++-
.../util/offheap/GridOffHeapEvictListener.java | 5 +
.../internal/util/offheap/GridOffHeapMap.java | 13 +-
.../util/offheap/GridOffHeapMapFactory.java | 28 +--
.../util/offheap/GridOffHeapPartitionedMap.java | 12 ++
.../util/offheap/unsafe/GridUnsafeMap.java | 128 ++++++++----
.../unsafe/GridUnsafePartitionedMap.java | 9 +
.../cache/CacheSwapUnswapGetTest.java | 85 +++++++-
.../cache/GridCacheAbstractFullApiSelfTest.java | 7 +-
.../GridCacheAbstractRemoveFailureTest.java | 199 +++++++++++++------
.../cache/GridCacheMemoryModeSelfTest.java | 9 +-
.../cache/GridCachePutAllFailoverSelfTest.java | 60 ++++++
.../processors/cache/GridCacheTestEntryEx.java | 6 +
.../GridCacheAbstractNodeRestartSelfTest.java | 149 ++++++++++++--
.../GridCacheDhtAtomicRemoveFailureTest.java | 16 +-
.../dht/GridCacheDhtRemoveFailureTest.java | 16 +-
.../dht/GridCacheTxNodeFailureSelfTest.java | 2 +-
.../IgniteCacheCrossCacheTxFailoverTest.java | 53 +++--
.../IgniteCachePutRetryAbstractSelfTest.java | 166 ++++++++++++++--
.../dht/IgniteCachePutRetryAtomicSelfTest.java | 2 +
...gniteCachePutRetryTransactionalSelfTest.java | 50 +++--
...eAtomicInvalidPartitionHandlingSelfTest.java | 98 +++++++--
...tomicPrimaryWriteOrderRemoveFailureTest.java | 15 +-
.../GridCacheAtomicRemoveFailureTest.java | 15 +-
.../GridCacheAtomicNearRemoveFailureTest.java | 15 +-
...cPrimaryWriteOrderNearRemoveFailureTest.java | 15 +-
.../near/GridCacheNearRemoveFailureTest.java | 15 +-
.../GridCachePartitionedNodeRestartTest.java | 9 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 9 +-
.../GridCacheReplicatedNodeRestartSelfTest.java | 8 +-
.../offheap/GridOffHeapMapAbstractSelfTest.java | 16 +-
.../GridOffHeapMapPerformanceAbstractTest.java | 4 +-
...idOffHeapPartitionedMapAbstractSelfTest.java | 20 ++
.../unsafe/GridUnsafeMapPerformanceTest.java | 2 +-
.../offheap/unsafe/GridUnsafeMapSelfTest.java | 2 +-
.../GridOffHeapMapPerformanceAbstractTest.java | 4 +-
.../unsafe/GridUnsafeMapPerformanceTest.java | 2 +-
.../ignite/testframework/GridTestUtils.java | 117 +++++++++++
.../IgniteCacheFailoverTestSuite.java | 6 -
.../IgniteCacheFailoverTestSuite3.java | 62 ++++++
.../testsuites/IgniteCacheRestartTestSuite.java | 14 +-
.../IgniteCacheRestartTestSuite2.java | 47 +++++
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 31 ++-
.../cache/GridCacheOffHeapAndSwapSelfTest.java | 4 +
.../IgniteCacheQueryMultiThreadedSelfTest.java | 9 +-
...QueryOffheapEvictsMultiThreadedSelfTest.java | 5 -
...lientQueryReplicatedNodeRestartSelfTest.java | 8 +-
.../IgniteCacheQueryNodeRestartSelfTest.java | 4 +-
.../IgniteCacheQueryNodeRestartSelfTest2.java | 8 +-
62 files changed, 1584 insertions(+), 421 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 1554e07..dfa0217 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -843,7 +843,7 @@ public class CacheMetricsImpl implements CacheMetrics {
offHeapEvicts.incrementAndGet();
if (delegate != null)
- delegate.onOffHeapRemove();
+ delegate.onOffHeapEvict();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9329e94..1fc94ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4096,21 +4096,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return t;
}
- catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
- IgniteTxRollbackCheckedException e) {
+ catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
- try {
- tx.rollback();
+ if (!(e instanceof IgniteTxRollbackCheckedException)) {
+ try {
+ tx.rollback();
- e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
- tx.xid(), e);
- }
- catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
- U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+ e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
+ tx.xid(), e);
+ }
+ catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
+ U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
- U.addLastCause(e, e1, log);
+ U.addLastCause(e, e1, log);
+ }
}
if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 98e86ed..430590a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -867,6 +867,19 @@ public interface GridCacheEntryEx {
public void updateTtl(@Nullable GridCacheVersion ver, long ttl);
/**
+ * Tries to do offheap -> swap eviction.
+ *
+ * @param entry Serialized swap entry.
+ * @param evictVer Version when entry was selected for eviction.
+ * @param obsoleteVer Obsolete version.
+ * @throws IgniteCheckedException If failed.
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ * @return {@code True} if entry was obsoleted and written to swap.
+ */
+ public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+ throws IgniteCheckedException, GridCacheEntryRemovedException;
+
+ /**
* @return Value.
* @throws IgniteCheckedException If failed to read from swap storage.
* @throws GridCacheEntryRemovedException If entry was removed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index f60c0eb..3e0e2f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -958,7 +958,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
- Set<GridCacheEntryEx> notRemove = null;
+ Set<GridCacheEntryEx> notRmv = null;
Collection<GridCacheBatchSwapEntry> swapped = new ArrayList<>(keys.size());
@@ -990,10 +990,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
locked.add(entry);
if (entry.obsolete()) {
- if (notRemove == null)
- notRemove = new HashSet<>();
+ if (notRmv == null)
+ notRmv = new HashSet<>();
- notRemove.add(entry);
+ notRmv.add(entry);
continue;
}
@@ -1004,11 +1004,19 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer);
if (swapEntry != null) {
+ assert entry.obsolete() : entry;
+
swapped.add(swapEntry);
if (log.isDebugEnabled())
log.debug("Entry was evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']');
}
+ else if (!entry.obsolete()) {
+ if (notRmv == null)
+ notRmv = new HashSet<>();
+
+ notRmv.add(entry);
+ }
}
// Batch write to swap.
@@ -1025,7 +1033,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
// Remove entries and fire events outside the locks.
for (GridCacheEntryEx entry : locked) {
- if (entry.obsolete() && (notRemove == null || !notRemove.contains(entry))) {
+ if (entry.obsolete() && (notRmv == null || !notRmv.contains(entry))) {
entry.onMarkedObsolete();
cache.removeEntry(entry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index eb4d864..f2bb646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -433,6 +433,41 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
+ @Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+ throws IgniteCheckedException, GridCacheEntryRemovedException {
+ assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this;
+
+ boolean obsolete;
+
+ synchronized (this) {
+ checkObsolete();
+
+ if (hasReaders() || !isStartVersion())
+ return false;
+
+ GridCacheMvcc mvcc = mvccExtras();
+
+ if (mvcc != null && !mvcc.isEmpty(obsoleteVer))
+ return false;
+
+ if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
+ assert !hasValueUnlocked() : this;
+
+ obsolete = markObsolete0(obsoleteVer, false);
+
+ assert obsolete : this;
+ }
+ else
+ obsolete = false;
+ }
+
+ if (obsolete)
+ onMarkedObsolete();
+
+ return obsolete;
+ }
+
+ /** {@inheritDoc} */
@Override public CacheObject unswap() throws IgniteCheckedException, GridCacheEntryRemovedException {
return unswap(true);
}
@@ -536,7 +571,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
log.debug("Value did not change, skip write swap entry: " + this);
if (cctx.swap().offheapEvictionEnabled())
- cctx.swap().enableOffheapEviction(key());
+ cctx.swap().enableOffheapEviction(key(), partition());
return;
}
@@ -2988,7 +3023,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
synchronized (this) {
checkObsolete();
- if (isNew() || (!preload && deletedUnlocked())) {
+ if ((isNew() && !cctx.swap().containsKey(key, partition())) || (!preload && deletedUnlocked())) {
long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
@@ -3643,6 +3678,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
if (F.isEmptyOrNulls(filter)) {
synchronized (this) {
+ if (obsoleteVersionExtras() != null)
+ return true;
+
CacheObject prev = saveValueForIndexUnlocked();
if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
@@ -3684,6 +3722,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return false;
synchronized (this) {
+ if (obsoleteVersionExtras() != null)
+ return true;
+
if (!v.equals(ver))
// Version has changed since entry passed the filter. Do it again.
continue;
@@ -3768,6 +3809,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
if (!isStartVersion() && hasValueUnlocked()) {
+ if (cctx.offheapTiered() && hasOffHeapPointer()) {
+ if (cctx.swap().offheapEvictionEnabled())
+ cctx.swap().enableOffheapEviction(key(), partition());
+
+ return null;
+ }
+
IgniteUuid valClsLdrId = null;
IgniteUuid keyClsLdrId = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index 81490a7..b7c66d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -126,9 +126,9 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
* @return Version.
*/
public static GridCacheVersion version(byte[] bytes) {
- int off = VERSION_OFFSET; // Skip ttl, expire time.
+ long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
- boolean verEx = bytes[off++] != 0;
+ boolean verEx = UNSAFE.getByte(bytes, off++) != 0;
return U.readVersion(bytes, off, verEx);
}
@@ -157,26 +157,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
return new IgniteBiTuple<>(valBytes, type);
}
- /**
- * @param bytes Entry bytes.
- * @return Value bytes offset.
- */
- public static int valueOffset(byte[] bytes) {
- assert bytes.length > 40 : bytes.length;
-
- int off = VERSION_OFFSET; // Skip ttl, expire time.
-
- boolean verEx = bytes[off++] != 0;
-
- off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
-
- off += 5; // Byte array flag + array size.
-
- assert bytes.length >= off;
-
- return off;
- }
-
/** {@inheritDoc} */
@Override public byte[] valueBytes() {
if (valBytes != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 7fd6013..9b6381e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionAware;
@@ -54,6 +55,7 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.swapspace.SwapKey;
@@ -101,8 +103,16 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
private final ReferenceQueue<Iterator<Map.Entry>> itQ = new ReferenceQueue<>();
/** Soft iterator set. */
- private final Collection<GridWeakIterator<Map.Entry>> itSet =
- new GridConcurrentHashSet<>();
+ private final Collection<GridWeakIterator<Map.Entry>> itSet = new GridConcurrentHashSet<>();
+
+ /** {@code True} if offheap to swap eviction is possible. */
+ private boolean offheapToSwapEvicts;
+
+ /** Values to be evicted from offheap to swap. */
+ private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> offheapEvicts = new ThreadLocal<>();
+
+ /** First offheap eviction warning flag. */
+ private volatile boolean firstEvictWarn;
/**
* @param enabled Flag to indicate if swap is enabled.
@@ -127,9 +137,58 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ *
+ */
+ public void unwindOffheapEvicts() {
+ if (!offheapToSwapEvicts)
+ return;
+
+ Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get();
+
+ if (evicts != null) {
+ GridCacheVersion obsoleteVer = cctx.versions().next();
+
+ for (IgniteBiTuple<byte[], byte[]> t : evicts) {
+ try {
+ byte[] kb = t.get1();
+ byte[] vb = t.get2();
+
+ GridCacheVersion evictVer = GridCacheSwapEntryImpl.version(vb);
+
+ KeyCacheObject key = cctx.toCacheKeyObject(kb);
+
+ while (true) {
+ GridCacheEntryEx entry = cctx.cache().entryEx(key);
+
+ try {
+ if (entry.offheapSwapEvict(vb, evictVer, obsoleteVer))
+ cctx.cache().removeEntry(entry);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ // Retry.
+ }
+ }
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ // Skip entry.
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal off-heap entry", e);
+ }
+ }
+
+ offheapEvicts.set(null);
+ }
+ }
+
+ /**
* Initializes off-heap space.
*/
private void initOffHeap() {
+ assert offheapEnabled;
+
// Register big data usage.
long max = cctx.config().getOffHeapMaxMemory();
@@ -137,43 +196,69 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int parts = cctx.config().getAffinity().partitions();
- GridOffHeapEvictListener lsnr = !swapEnabled && !offheapEnabled ? null : new GridOffHeapEvictListener() {
- private volatile boolean firstEvictWarn;
+ GridOffHeapEvictListener lsnr;
- @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
- try {
- if (!firstEvictWarn)
- warnFirstEvict();
+ if (swapEnabled) {
+ offheapToSwapEvicts = true;
- writeToSwap(part, cctx.toCacheKeyObject(kb), vb);
+ lsnr = new GridOffHeapEvictListener() {
+ @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
+ assert offheapToSwapEvicts;
- if (cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onOffHeapEvict();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e);
- }
- }
+ onOffheapEvict();
- private void warnFirstEvict() {
- synchronized (this) {
- if (firstEvictWarn)
- return;
+ Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get();
- firstEvictWarn = true;
+ if (evicts == null)
+ offheapEvicts.set(evicts = new ArrayList<>());
+
+ evicts.add(new IgniteBiTuple<>(kb, vb));
}
- U.warn(log, "Off-heap evictions started. You may wish to increase 'offHeapMaxMemory' in " +
- "cache configuration [cache=" + cctx.name() +
- ", offHeapMaxMemory=" + cctx.config().getOffHeapMaxMemory() + ']',
- "Off-heap evictions started: " + cctx.name());
- }
- };
+ @Override public boolean removeEvicted() {
+ return false;
+ }
+ };
+ }
+ else {
+ lsnr = new GridOffHeapEvictListener() {
+ @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
+ onOffheapEvict();
+ }
+
+ @Override public boolean removeEvicted() {
+ return true;
+ }
+ };
+ }
offheap.create(spaceName, parts, init, max, lsnr);
}
/**
+ * Warns on first evict from off-heap.
+ */
+ private void onOffheapEvict() {
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapEvict();
+
+ if (firstEvictWarn)
+ return;
+
+ synchronized (this) {
+ if (firstEvictWarn)
+ return;
+
+ firstEvictWarn = true;
+ }
+
+ U.warn(log, "Off-heap evictions started. You may wish to increase 'offHeapMaxMemory' in " +
+ "cache configuration [cache=" + cctx.name() +
+ ", offHeapMaxMemory=" + cctx.config().getOffHeapMaxMemory() + ']',
+ "Off-heap evictions started: " + cctx.name());
+ }
+
+ /**
* @return {@code True} if swap store is enabled.
*/
public boolean swapEnabled() {
@@ -440,17 +525,16 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param key Key to check.
+ * @param part Partition.
* @return {@code True} if key is contained.
* @throws IgniteCheckedException If failed.
*/
- public boolean containsKey(KeyCacheObject key) throws IgniteCheckedException {
+ public boolean containsKey(KeyCacheObject key, int part) throws IgniteCheckedException {
if (!offheapEnabled && !swapEnabled)
return false;
checkIteratorQueue();
- int part = cctx.affinity().partition(key);
-
// First check off-heap store.
if (offheapEnabled) {
boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
@@ -480,6 +564,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param key Key to read.
+ * @param keyBytes Key bytes.
* @param part Key partition.
* @param entryLocked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
@@ -966,6 +1051,46 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * @param key Key to move from offheap to swap.
+ * @param entry Serialized swap entry.
+ * @param part Partition.
+ * @param ver Expected entry version.
+ * @return {@code True} if removed.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean offheapSwapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver)
+ throws IgniteCheckedException {
+ assert offheapEnabled;
+
+ checkIteratorQueue();
+
+ boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()),
+ new IgniteBiPredicate<Long, Integer>() {
+ @Override public boolean apply(Long ptr, Integer len) {
+ GridCacheVersion ver0 = GridCacheOffheapSwapEntry.version(ptr);
+
+ return ver.equals(ver0);
+ }
+ }
+ );
+
+ if (rmv) {
+ Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
+
+ if (lsnrs != null) {
+ GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+
+ for (GridCacheSwapListener lsnr : lsnrs)
+ lsnr.onEntryUnswapped(part, key, e);
+ }
+
+ cctx.swap().writeToSwap(part, key, entry);
+ }
+
+ return rmv;
+ }
+
+ /**
* @return {@code True} if offheap eviction is enabled.
*/
boolean offheapEvictionEnabled() {
@@ -976,16 +1101,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* Enables eviction for offheap entry after {@link #readOffheapPointer} was called.
*
* @param key Key.
+ * @param part Partition.
* @throws IgniteCheckedException If failed.
*/
- void enableOffheapEviction(final KeyCacheObject key) throws IgniteCheckedException {
+ void enableOffheapEviction(final KeyCacheObject key, int part) throws IgniteCheckedException {
if (!offheapEnabled)
return;
checkIteratorQueue();
- int part = cctx.affinity().partition(key);
-
offheap.enableEviction(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 980971c..2d5698a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1029,8 +1029,15 @@ public class GridCacheUtils {
ctx.evicts().unwind();
- if (ctx.isNear())
- ctx.near().dht().context().evicts().unwind();
+ ctx.swap().unwindOffheapEvicts();
+
+ if (ctx.isNear()) {
+ GridCacheContext dhtCtx = ctx.near().dht().context();
+
+ dhtCtx.evicts().unwind();
+
+ dhtCtx.swap().unwindOffheapEvicts();
+ }
ctx.ttl().expire();
}
@@ -1041,14 +1048,8 @@ public class GridCacheUtils {
public static <K, V> void unwindEvicts(GridCacheSharedContext<K, V> ctx) {
assert ctx != null;
- for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) {
- cacheCtx.evicts().unwind();
-
- if (cacheCtx.isNear())
- cacheCtx.near().dht().context().evicts().unwind();
-
- cacheCtx.ttl().expire();
- }
+ for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts())
+ unwindEvicts(cacheCtx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index c5f15cd..956f2bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -262,8 +262,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
map.put(entry.key(), entry);
- if (!entry.isInternal())
+ if (!entry.isInternal()) {
+ assert !entry.deleted() : entry;
+
mapPubSize.increment();
+ }
}
/**
@@ -271,7 +274,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
void onRemoved(GridDhtCacheEntry entry) {
- assert entry.obsolete();
+ assert entry.obsolete() : entry;
// Make sure to remove exactly this entry.
synchronized (entry) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5e183e9..fcb012f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -546,10 +546,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param updateSeq Update sequence.
* @return Local partition.
*/
- private GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) {
- while (true) {
- boolean belongs = cctx.affinity().localNode(p, topVer);
+ private GridDhtLocalPartition localPartition(int p,
+ AffinityTopologyVersion topVer,
+ boolean create,
+ boolean updateSeq) {
+ boolean belongs = create && cctx.affinity().localNode(p, topVer);
+ while (true) {
GridDhtLocalPartition loc = locParts.get(p);
if (loc != null && loc.state() == EVICTED) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 3f9decf..2048fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -234,6 +234,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* Completeness callback.
+ *
+ * @return {@code True} if future was finished by this call.
*/
private boolean onComplete() {
Throwable err0 = err.get();
@@ -457,6 +459,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/**
* @param reads Read entries.
* @param writes Write entries.
+ * @throws IgniteCheckedException If failed.
*/
private void prepare(
Iterable<IgniteTxEntry> reads,
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 21aaef2..ab6dc3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -404,8 +404,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (backup == null) {
readyNearMappingFromBackup(mapping);
+ ClusterTopologyCheckedException cause =
+ new ClusterTopologyCheckedException("Backup node left grid: " + backupId);
+
+ cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
- "(backup has left grid): " + tx.xidVersion()));
+ "(backup has left grid): " + tx.xidVersion(), cause));
}
else if (backup.isLocal()) {
boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion());
@@ -414,9 +419,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (committed)
mini.onDone(tx);
- else
+ else {
+ ClusterTopologyCheckedException cause =
+ new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
+
+ cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
- "(transaction has been rolled back on backup node): " + tx.xidVersion()));
+ "(transaction has been rolled back on backup node): " + tx.xidVersion(), cause));
+ }
}
else {
GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
@@ -731,8 +742,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
readyNearMappingFromBackup(m);
- if (res.checkCommittedError() != null)
- onDone(res.checkCommittedError());
+ Throwable err = res.checkCommittedError();
+
+ if (err != null) {
+ if (err instanceof IgniteCheckedException) {
+ ClusterTopologyCheckedException cause =
+ ((IgniteCheckedException)err).getCause(ClusterTopologyCheckedException.class);
+
+ if (cause != null)
+ cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+ }
+
+ onDone(err);
+ }
else
onDone(tx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9efa43a..756672a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1049,6 +1049,7 @@ public class IgniteTxHandler {
*
* @param nodeId Node id that originated finish request.
* @param req Request.
+ * @param {@code True} if transaction committed on this node.
*/
protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
if (req.replyRequired()) {
@@ -1057,9 +1058,13 @@ public class IgniteTxHandler {
if (req.checkCommitted()) {
res.checkCommitted(true);
- if (!committed)
+ if (!committed) {
+ ClusterTopologyCheckedException cause =
+ new ClusterTopologyCheckedException("Primary node left grid.");
+
res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
- "(transaction has been rolled back on backup node): " + req.version()));
+ "(transaction has been rolled back on backup node): " + req.version(), cause));
+ }
}
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index 024ea7c..492fa07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapPartitionedMap;
import org.apache.ignite.internal.util.typedef.CX2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;
@@ -261,13 +262,35 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
* @return {@code true} If succeeded.
* @throws IgniteCheckedException If failed.
*/
- public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException {
+ public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
+ throws IgniteCheckedException {
GridOffHeapPartitionedMap m = offheap(spaceName);
return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes));
}
/**
+ * Removes value from offheap space for the given key.
+ *
+ * @param spaceName Space name.
+ * @param part Partition.
+ * @param key Key.
+ * @param keyBytes Key bytes.
+ * @param p Value predicate (arguments are value address and value length).
+ * @return {@code true} If succeeded.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean removex(@Nullable String spaceName,
+ int part,
+ KeyCacheObject key,
+ byte[] keyBytes,
+ IgniteBiPredicate<Long, Integer> p) throws IgniteCheckedException {
+ GridOffHeapPartitionedMap m = offheap(spaceName);
+
+ return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes), p);
+ }
+
+ /**
* Gets iterator over contents of the given space.
*
* @param spaceName Space name.
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
index 4597be8..beafea4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
@@ -30,4 +30,9 @@ public interface GridOffHeapEvictListener {
* @param valBytes Value bytes.
*/
public void onEvict(int part, int hash, byte[] keyBytes, byte[] valBytes);
+
+ /**
+ * @return {@code True} if entry selected for eviction should be immediately removed.
+ */
+ public boolean removeEvicted();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
index 1fcddd7..d14a582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.util.offheap;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.CX2;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
* Off-heap map.
*/
-public interface GridOffHeapMap<K> {
+public interface GridOffHeapMap {
/**
* Gets partition this map belongs to.
*
@@ -102,6 +103,16 @@ public interface GridOffHeapMap<K> {
public boolean removex(int hash, byte[] keyBytes);
/**
+ * Removes value from off-heap map without returning it.
+ *
+ * @param hash Hash.
+ * @param keyBytes Key bytes.
+ * @param p Value predicate (arguments are value address and value length).
+ * @return {@code True} if value was removed.
+ */
+ public boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p);
+
+ /**
* Puts key and value bytes into the map potentially replacing
* existing entry.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
index 1a3d219..4dd911f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
@@ -32,8 +32,8 @@ public class GridOffHeapMapFactory {
* @param initCap Initial capacity.
* @return Off-heap map.
*/
- public static <K> GridOffHeapMap<K> unsafeMap(long initCap) {
- return new GridUnsafeMap<>(128, 0.75f, initCap, 0, (short)0, null);
+ public static GridOffHeapMap unsafeMap(long initCap) {
+ return new GridUnsafeMap(128, 0.75f, initCap, 0, (short)0, null);
}
/**
@@ -43,8 +43,8 @@ public class GridOffHeapMapFactory {
* @param initCap Initial capacity.
* @return Off-heap map.
*/
- public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, long initCap) {
- return new GridUnsafeMap<>(concurrency, 0.75f, initCap, 0, (short)0, null);
+ public static GridOffHeapMap unsafeMap(int concurrency, long initCap) {
+ return new GridUnsafeMap(concurrency, 0.75f, initCap, 0, (short)0, null);
}
/**
@@ -55,8 +55,8 @@ public class GridOffHeapMapFactory {
* @param initCap Initial capacity.
* @return Off-heap map.
*/
- public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, float load, long initCap) {
- return new GridUnsafeMap<>(concurrency, load, initCap, 0, (short)0, null);
+ public static GridOffHeapMap unsafeMap(int concurrency, float load, long initCap) {
+ return new GridUnsafeMap(concurrency, load, initCap, 0, (short)0, null);
}
/**
@@ -68,8 +68,8 @@ public class GridOffHeapMapFactory {
* @param lruStripes Number of LRU stripes.
* @return Off-heap map.
*/
- public static <K> GridOffHeapMap<K> unsafeMap(long initCap, long totalMem, short lruStripes) {
- return new GridUnsafeMap<>(128, 0.75f, initCap, totalMem, lruStripes, null);
+ public static GridOffHeapMap unsafeMap(long initCap, long totalMem, short lruStripes) {
+ return new GridUnsafeMap(128, 0.75f, initCap, totalMem, lruStripes, null);
}
/**
@@ -82,9 +82,9 @@ public class GridOffHeapMapFactory {
* @param lsnr Optional eviction listener which gets notified every time an entry is evicted.
* @return Off-heap map.
*/
- public static <K> GridOffHeapMap<K> unsafeMap(long initCap, long totalMem, short lruStripes,
+ public static GridOffHeapMap unsafeMap(long initCap, long totalMem, short lruStripes,
@Nullable GridOffHeapEvictListener lsnr) {
- return new GridUnsafeMap<>(128, 0.75f, initCap, totalMem, lruStripes, lsnr);
+ return new GridUnsafeMap(128, 0.75f, initCap, totalMem, lruStripes, lsnr);
}
/**
@@ -98,9 +98,9 @@ public class GridOffHeapMapFactory {
* @param lsnr Optional eviction listener which gets notified every time an entry is evicted.
* @return Off-heap map.
*/
- public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, long initCap, long totalMem, short lruStripes,
+ public static GridOffHeapMap unsafeMap(int concurrency, long initCap, long totalMem, short lruStripes,
@Nullable GridOffHeapEvictListener lsnr) {
- return new GridUnsafeMap<>(concurrency, 0.75f, initCap, totalMem, lruStripes, lsnr);
+ return new GridUnsafeMap(concurrency, 0.75f, initCap, totalMem, lruStripes, lsnr);
}
/**
@@ -115,9 +115,9 @@ public class GridOffHeapMapFactory {
* @param lsnr Optional eviction listener which gets notified every time an entry is evicted.
* @return Off-heap map.
*/
- public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, float load, long initCap, long totalMem,
+ public static <K> GridOffHeapMap unsafeMap(int concurrency, float load, long initCap, long totalMem,
short lruStripes, @Nullable GridOffHeapEvictListener lsnr) {
- return new GridUnsafeMap<>(concurrency, load, initCap, totalMem, lruStripes, lsnr);
+ return new GridUnsafeMap(concurrency, load, initCap, totalMem, lruStripes, lsnr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
index 3afdfa9..c1e1bfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
@@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.CX2;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
@@ -111,6 +112,17 @@ public interface GridOffHeapPartitionedMap {
public boolean removex(int p, int hash, byte[] keyBytes);
/**
+ * Removes value from off-heap map without returning it.
+ *
+ * @param part Partition.
+ * @param hash Hash.
+ * @param keyBytes Key bytes.
+ * @param p Value predicate (arguments are value address and value length).
+ * @return {@code True} if value was removed.
+ */
+ public boolean removex(int part, int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p);
+
+ /**
* Puts key and value bytes into the map potentially replacing
* existing entry.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
index 40fb3e8..359d36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.CX2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import org.jsr166.LongAdder8;
@@ -42,7 +43,7 @@ import static org.apache.ignite.internal.util.offheap.GridOffHeapEvent.REHASH;
/**
* Off-heap map based on {@code Unsafe} implementation.
*/
-public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
+public class GridUnsafeMap implements GridOffHeapMap {
/** Header size. */
private static final int HEADER = 4 /*hash*/ + 4 /*key-size*/ + 4 /*value-size*/ + 8 /*queue-address*/ +
8 /*next-address*/;
@@ -77,7 +78,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
private final float load;
/** Segments. */
- private final Segment<K>[] segs;
+ private final Segment[] segs;
/** Total memory. */
private final GridUnsafeMemory mem;
@@ -111,6 +112,9 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
/** LRU poller. */
private final GridUnsafeLruPoller lruPoller;
+ /** */
+ private final boolean rmvEvicted;
+
/**
* @param concurrency Concurrency.
* @param load Load factor.
@@ -180,6 +184,8 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
}
}
};
+
+ rmvEvicted = evictLsnr == null || evictLsnr.removeEvicted();
}
/**
@@ -225,6 +231,8 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
segs = new Segment[size];
init(initCap, size);
+
+ rmvEvicted = evictLsnr == null || evictLsnr.removeEvicted();
}
/**
@@ -247,7 +255,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
for (int i = 0; i < size; i++) {
try {
- segs[i] = new Segment<>(i, cap);
+ segs[i] = new Segment(i, cap);
}
catch (GridOffHeapOutOfMemoryException e) {
destruct();
@@ -327,6 +335,11 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
}
/** {@inheritDoc} */
+ @Override public boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p) {
+ return segmentFor(hash).removex(hash, keyBytes, p);
+ }
+
+ /** {@inheritDoc} */
@Override public boolean put(int hash, byte[] keyBytes, byte[] valBytes) {
return segmentFor(hash).put(hash, keyBytes, valBytes);
}
@@ -559,7 +572,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
/**
* Segment.
*/
- private class Segment<K> {
+ private class Segment {
/** Lock. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -1009,41 +1022,51 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
}
if (cur != 0) {
- long next = Entry.nextAddress(cur, mem);
+ long qAddr0 = Entry.queueAddress(cur, mem);
- if (prev != 0)
- Entry.nextAddress(prev, next, mem); // Relink.
- else {
- if (next == 0)
- Bin.clear(binAddr, mem);
- else
- Bin.first(binAddr, next, mem);
- }
+ assert qAddr == qAddr0 : "Queue node address mismatch " +
+ "[qAddr=" + qAddr + ", entryQueueAddr=" + qAddr + ']';
if (evictLsnr != null) {
keyBytes = Entry.readKeyBytes(cur, mem);
- // TODO: GG-8123: Inlined as a workaround. Revert when 7u60 is released.
-// valBytes = Entry.readValueBytes(cur, mem);
- {
- int keyLen = Entry.readKeyLength(cur, mem);
- int valLen = Entry.readValueLength(cur, mem);
+ int keyLen = Entry.readKeyLength(cur, mem);
+ int valLen = Entry.readValueLength(cur, mem);
- valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
- }
+ valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
}
- long a;
+ if (rmvEvicted) {
+ long a;
- assert qAddr == (a = Entry.queueAddress(cur, mem)) : "Queue node address mismatch " +
- "[qAddr=" + qAddr + ", entryQueueAddr=" + a + ']';
+ assert qAddr == (a = Entry.queueAddress(cur, mem)) : "Queue node address mismatch " +
+ "[qAddr=" + qAddr + ", entryQueueAddr=" + a + ']';
- relSize = Entry.size(cur, mem);
- relAddr = cur;
+ long next = Entry.nextAddress(cur, mem);
- cnt--;
+ if (prev != 0)
+ Entry.nextAddress(prev, next, mem); // Relink.
+ else {
+ if (next == 0)
+ Bin.clear(binAddr, mem);
+ else
+ Bin.first(binAddr, next, mem);
+ }
- totalCnt.decrement();
+ relSize = Entry.size(cur, mem);
+ relAddr = cur;
+
+ cnt--;
+
+ totalCnt.decrement();
+ }
+ else {
+ boolean clear = Entry.clearQueueAddress(cur, qAddr, mem);
+
+ assert clear;
+
+ relSize = Entry.size(cur, mem);
+ }
}
}
}
@@ -1251,7 +1274,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
*/
@SuppressWarnings("TooBroadScope")
byte[] remove(int hash, byte[] keyBytes) {
- return remove(hash, keyBytes, true);
+ return remove(hash, keyBytes, true, null);
}
/**
@@ -1260,17 +1283,28 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
* @return {@code True} if value was removed.
*/
boolean removex(int hash, byte[] keyBytes) {
- return remove(hash, keyBytes, false) == EMPTY_BYTES;
+ return remove(hash, keyBytes, false, null) == EMPTY_BYTES;
+ }
+
+ /**
+ * @param hash Hash.
+ * @param keyBytes Key bytes.
+ * @param p Value predicate.
+ * @return {@code True} if value was removed.
+ */
+ boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p) {
+ return remove(hash, keyBytes, false, p) == EMPTY_BYTES;
}
/**
* @param hash Hash.
* @param keyBytes Key bytes.
* @param retval {@code True} if need removed value.
+ * @param p Value predicate.
* @return Removed value bytes.
*/
@SuppressWarnings("TooBroadScope")
- byte[] remove(int hash, byte[] keyBytes, boolean retval) {
+ byte[] remove(int hash, byte[] keyBytes, boolean retval, @Nullable IgniteBiPredicate<Long, Integer> p) {
int relSize = 0;
long relAddr = 0;
long qAddr = 0;
@@ -1291,6 +1325,19 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
// If found match.
if (Entry.keyEquals(cur, keyBytes, mem)) {
+ int keyLen = 0;
+ int valLen = 0;
+
+ if (p != null) {
+ keyLen = Entry.readKeyLength(cur, mem);
+ valLen = Entry.readValueLength(cur, mem);
+
+ long valPtr = cur + HEADER + keyLen;
+
+ if (!p.apply(valPtr, valLen))
+ return null;
+ }
+
if (prev != 0)
Entry.nextAddress(prev, next, mem); // Relink.
else {
@@ -1300,18 +1347,16 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
Bin.first(binAddr, next, mem);
}
- // TODO: GG-8123: Inlined as a workaround. Revert when 7u60 is released.
-// valBytes = retval ? Entry.readValueBytes(cur, mem) : EMPTY_BYTES;
- {
- if (retval) {
- int keyLen = Entry.readKeyLength(cur, mem);
- int valLen = Entry.readValueLength(cur, mem);
-
- valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
+ if (retval) {
+ if (keyLen == 0) {
+ keyLen = Entry.readKeyLength(cur, mem);
+ valLen = Entry.readValueLength(cur, mem);
}
- else
- valBytes = EMPTY_BYTES;
+
+ valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
}
+ else
+ valBytes = EMPTY_BYTES;
// Prepare release of memory.
qAddr = Entry.queueAddress(cur, mem);
@@ -1382,8 +1427,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
* @param keyBytes Key bytes.
* @return Value pointer.
*/
- @Nullable
- IgniteBiTuple<Long, Integer> valuePointer(int hash, byte[] keyBytes) {
+ @Nullable IgniteBiTuple<Long, Integer> valuePointer(int hash, byte[] keyBytes) {
long binAddr = readLock(hash);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
index 070da51..fb8ac14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapMap;
import org.apache.ignite.internal.util.offheap.GridOffHeapPartitionedMap;
import org.apache.ignite.internal.util.typedef.CX2;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import org.jsr166.LongAdder8;
@@ -198,6 +199,14 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
}
/** {@inheritDoc} */
+ @Override public boolean removex(int part,
+ int hash,
+ byte[] keyBytes,
+ IgniteBiPredicate<Long, Integer> p) {
+ return mapFor(part).removex(hash, keyBytes, p);
+ }
+
+ /** {@inheritDoc} */
@Override public boolean put(int p, int hash, byte[] keyBytes, byte[] valBytes) {
return mapFor(p).put(hash, keyBytes, valBytes);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
index 271d8b1..214beb6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -53,6 +54,12 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
/** */
private static final long DURATION = 30_000;
+ /** */
+ private static final long OFFHEAP_MEM = 1000;
+
+ /** */
+ private static final int MAX_HEAP_SIZE = 100;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -81,7 +88,7 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
if (memMode == CacheMemoryMode.ONHEAP_TIERED) {
LruEvictionPolicy plc = new LruEvictionPolicy();
- plc.setMaxSize(100);
+ plc.setMaxSize(MAX_HEAP_SIZE);
ccfg.setEvictionPolicy(plc);
}
@@ -89,7 +96,7 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
if (swap) {
ccfg.setSwapEnabled(true);
- ccfg.setOffHeapMaxMemory(1000);
+ ccfg.setOffHeapMaxMemory(OFFHEAP_MEM);
}
else
ccfg.setOffHeapMaxMemory(0);
@@ -133,6 +140,20 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testTxCacheOffheapSwapEvict() throws Exception {
+ swapUnswap(TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxCacheOffheapTieredSwapEvict() throws Exception {
+ swapUnswap(TRANSACTIONAL, CacheMemoryMode.OFFHEAP_TIERED, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testAtomicCacheOffheapEvict() throws Exception {
swapUnswap(ATOMIC, CacheMemoryMode.ONHEAP_TIERED, false);
}
@@ -145,6 +166,20 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAtomicCacheOffheapSwapEvict() throws Exception {
+ swapUnswap(ATOMIC, CacheMemoryMode.ONHEAP_TIERED, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicCacheOffheapTieredSwapEvict() throws Exception {
+ swapUnswap(ATOMIC, CacheMemoryMode.OFFHEAP_TIERED, true);
+ }
+
+ /**
* @param atomicityMode Cache atomicity mode.
* @param memMode Cache memory mode.
* @param swap {@code True} if swap enabled.
@@ -220,12 +255,56 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
}
});
- Thread.sleep(DURATION);
+ long endTime = System.currentTimeMillis() + DURATION;
+
+ while (System.currentTimeMillis() < endTime) {
+ Thread.sleep(5000);
+
+ log.info("Cache size [heap=" + cache.localSize(CachePeekMode.ONHEAP) +
+ ", offheap=" + cache.localSize(CachePeekMode.OFFHEAP) +
+ ", swap=" + cache.localSize(CachePeekMode.SWAP) +
+ ", total=" + cache.localSize() +
+ ", offheapMem=" + cache.metrics().getOffHeapAllocatedSize() + ']');
+ }
done.set(true);
fut.get();
getFut.get();
+
+ for (Integer key : keys) {
+ String val = cache.get(key);
+
+ assertNotNull(val);
+ }
+
+ int onheapSize = cache.localSize(CachePeekMode.ONHEAP);
+ int offheapSize = cache.localSize(CachePeekMode.OFFHEAP);
+ int swapSize = cache.localSize(CachePeekMode.SWAP);
+ int total = cache.localSize();
+ long offheapMem = cache.metrics().getOffHeapAllocatedSize();
+
+ log.info("Cache size [heap=" + onheapSize +
+ ", offheap=" + offheapSize +
+ ", swap=" + swapSize +
+ ", total=" + total +
+ ", offheapMem=" + offheapMem + ']');
+
+ assertTrue(total > 0);
+
+ assertEquals(onheapSize + offheapSize + swapSize, total);
+
+ if (memMode == CacheMemoryMode.OFFHEAP_TIERED)
+ assertEquals(0, onheapSize);
+ else
+ assertEquals(MAX_HEAP_SIZE, onheapSize);
+
+ if (swap) {
+ assertTrue(swapSize > 0);
+ assertTrue(offheapMem <= OFFHEAP_MEM);
+ }
+ else
+ assertEquals(0, swapSize);
}
finally {
done.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 3e646d3..2a64963 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -2825,7 +2825,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
try {
cache.clear();
- assertEquals(vals.get(first), cache.localPeek(first, ONHEAP));
+ GridCacheContext<String, Integer> cctx = context(0);
+
+ GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(first) :
+ cctx.cache().peekEx(first);
+
+ assertNotNull(entry);
}
finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
index 31488e0..647746e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
@@ -21,38 +21,50 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Tests that removes are not lost when topology changes.
*/
-public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstractSelfTest {
+public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
/** */
private static final int GRID_CNT = 3;
/** Keys count. */
- private static final int KEYS_CNT = 10000;
+ private static final int KEYS_CNT = 10_000;
/** Test duration. */
private static final long DUR = 90 * 1000L;
@@ -66,36 +78,21 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
/** Start delay. */
private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, 5000);
- /** Node kill lock (used to prevent killing while cache data is compared). */
- private final Lock killLock = new ReentrantLock();
-
/** */
- private CountDownLatch assertLatch;
-
- /** */
- private CountDownLatch updateLatch;
-
- /** Caches comparison request flag. */
- private volatile boolean cmp;
-
- /** */
- private String sizePropVal;
+ private static String sizePropVal;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER).setForceServerMode(true);
if (testClientNode() && getTestGridName(0).equals(gridName))
cfg.setClientMode(true);
- return cfg;
- }
+ cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
- /** {@inheritDoc} */
- @Override protected int gridCount() {
- return GRID_CNT;
+ return cfg;
}
/** {@inheritDoc} */
@@ -104,6 +101,8 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
sizePropVal = System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE);
System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000");
+
+ startGrids(GRID_CNT);
}
/** {@inheritDoc} */
@@ -111,15 +110,7 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
super.afterTestsStopped();
System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, sizePropVal != null ? sizePropVal : "");
- }
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- startGrids(gridCount());
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
stopAllGrids();
}
@@ -129,6 +120,28 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
}
/**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Cache atomicity mode.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Near cache configuration.
+ */
+ protected abstract NearCacheConfiguration nearCache();
+
+ /**
+ * @return Atomic cache write order mode.
+ */
+ protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+ return null;
+ }
+
+ /**
* @return {@code True} if test updates from client node.
*/
protected boolean testClientNode() {
@@ -139,9 +152,49 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
* @throws Exception If failed.
*/
public void testPutAndRemove() throws Exception {
- assertEquals(testClientNode(), (boolean)grid(0).configuration().isClientMode());
+ putAndRemove(DUR, GridTestUtils.TestMemoryMode.HEAP);
+ }
- final IgniteCache<Integer, Integer> sndCache0 = grid(0).cache(null);
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAndRemoveOffheapEvict() throws Exception {
+ putAndRemove(30_000, GridTestUtils.TestMemoryMode.OFFHEAP_EVICT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAndRemoveOffheapEvictSwap() throws Exception {
+ putAndRemove(30_000, GridTestUtils.TestMemoryMode.OFFHEAP_EVICT_SWAP);
+ }
+
+ /**
+ * @param duration Test duration.
+ * @param memMode Memory mode.
+ * @throws Exception If failed.
+ */
+ private void putAndRemove(long duration, GridTestUtils.TestMemoryMode memMode) throws Exception {
+ assertEquals(testClientNode(), (boolean) grid(0).configuration().isClientMode());
+
+ grid(0).destroyCache(null);
+
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ ccfg.setCacheMode(cacheMode());
+
+ if (cacheMode() == PARTITIONED)
+ ccfg.setBackups(1);
+
+ ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode());
+ ccfg.setNearConfiguration(nearCache());
+
+ GridTestUtils.setMemoryMode(null, ccfg, memMode, 100, 1024);
+
+ final IgniteCache<Integer, Integer> sndCache0 = grid(0).createCache(ccfg);
final AtomicBoolean stop = new AtomicBoolean();
@@ -152,8 +205,12 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
// Expected values in cache.
final Map<Integer, GridTuple<Integer>> expVals = new ConcurrentHashMap8<>();
+ final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>();
+
IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
+ Thread.currentThread().setName("update-thread");
+
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!stop.get()) {
@@ -190,10 +247,14 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
cntr.addAndGet(100);
- if (cmp) {
- assertLatch.countDown();
+ CyclicBarrier barrier = cmp.get();
+
+ if (barrier != null) {
+ log.info("Wait data check.");
- updateLatch.await();
+ barrier.await(60_000, TimeUnit.MILLISECONDS);
+
+ log.info("Finished wait data check.");
}
}
@@ -203,16 +264,21 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
IgniteInternalFuture<?> killFut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
+ Thread.currentThread().setName("restart-thread");
+
while (!stop.get()) {
U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2()));
- killLock.lock();
+ killAndRestart(stop);
- try {
- killAndRestart(stop);
- }
- finally {
- killLock.unlock();
+ CyclicBarrier barrier = cmp.get();
+
+ if (barrier != null) {
+ log.info("Wait data check.");
+
+ barrier.await(60_000, TimeUnit.MILLISECONDS);
+
+ log.info("Finished wait data check.");
}
}
@@ -221,7 +287,7 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
});
try {
- long stopTime = DUR + U.currentTimeMillis() ;
+ long stopTime = duration + U.currentTimeMillis() ;
long nextAssert = U.currentTimeMillis() + ASSERT_FREQ;
@@ -241,31 +307,34 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
log.info("Operations/second: " + opsPerSecond);
if (U.currentTimeMillis() >= nextAssert) {
- updateLatch = new CountDownLatch(1);
+ CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
+ @Override public void run() {
+ try {
+ cmp.set(null);
- assertLatch = new CountDownLatch(1);
+ log.info("Checking cache content.");
- cmp = true;
+ assertCacheContent(expVals);
- killLock.lock();
+ log.info("Finished check cache content.");
+ }
+ catch (Throwable e) {
+ log.error("Unexpected error: " + e, e);
- try {
- if (!assertLatch.await(60_000, TimeUnit.MILLISECONDS))
- throw new IgniteCheckedException("Failed to suspend thread executing updates.");
+ throw e;
+ }
+ }
+ });
- log.info("Checking cache content.");
+ log.info("Start cache content check.");
- assertCacheContent(expVals);
+ cmp.set(barrier);
- nextAssert = System.currentTimeMillis() + ASSERT_FREQ;
- }
- finally {
- killLock.unlock();
+ barrier.await(60_000, TimeUnit.MILLISECONDS);
- updateLatch.countDown();
+ log.info("Cache content check done.");
- U.sleep(500);
- }
+ nextAssert = System.currentTimeMillis() + ASSERT_FREQ;
}
}
}
@@ -278,18 +347,17 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
updateFut.get();
log.info("Test finished. Update errors: " + errCntr.get());
-
}
/**
* @param stop Stop flag.
* @throws Exception If failed.
*/
- void killAndRestart(AtomicBoolean stop) throws Exception {
+ private void killAndRestart(AtomicBoolean stop) throws Exception {
if (stop.get())
return;
- int idx = random(1, gridCount() + 1);
+ int idx = random(1, GRID_CNT + 1);
log.info("Killing node " + idx);
@@ -309,10 +377,9 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
/**
* @param expVals Expected values in cache.
- * @throws Exception If failed.
*/
@SuppressWarnings({"TooBroadScope", "ConstantIfStatement"})
- private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) throws Exception {
+ private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) {
assert !expVals.isEmpty();
Collection<Integer> failedKeys = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
index c5c1c39..2f6ee8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
@@ -232,9 +232,15 @@ public class GridCacheMemoryModeSelfTest extends GridCommonAbstractTest {
//putAll
doTest(cache, offheapSwap, offheapEmpty, swapEmpty, new CIX1<IgniteCache<String, Integer>>() {
@Override public void applyx(IgniteCache<String, Integer> c) throws IgniteCheckedException {
+ putAll(c, 0, all / 2);
+
+ putAll(c, all / 2 + 1, all - 1);
+ }
+
+ private void putAll(IgniteCache<String, Integer> c, int k1, int k2) {
Map<String, Integer> m = new HashMap<>();
- for (int i = 0; i < all; i++)
+ for (int i = k1; i <= k2; i++)
m.put(valueOf(i), i);
c.putAll(m);
@@ -264,6 +270,7 @@ public class GridCacheMemoryModeSelfTest extends GridCommonAbstractTest {
assertEquals(offheapSwap, c.localSize(CachePeekMode.OFFHEAP) + c.localSize(CachePeekMode.SWAP));
info("size: " + c.size());
+ info("heap: " + c.localSize(CachePeekMode.ONHEAP));
info("offheap: " + c.localSize(CachePeekMode.OFFHEAP));
info("swap: " + c.localSize(CachePeekMode.SWAP));