You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/13 19:10:08 UTC
[02/50] [abbrv] incubator-ignite git commit: #ignite-286: Make cache
full api test work in OFFHEAP_TIERED mode.
#ignite-286: Make cache full api test work in OFFHEAP_TIERED mode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/745cf7f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/745cf7f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/745cf7f9
Branch: refs/heads/ignite-630
Commit: 745cf7f9bd4ca8e649fa77fdfe4e9e3468ecaaa0
Parents: e8a38e0
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri May 8 12:15:29 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri May 8 12:15:29 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 26 ++-
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheProxyImpl.java | 24 --
.../processors/cache/GridCacheSwapManager.java | 215 +++++++++++++-----
.../processors/cache/IgniteInternalCache.java | 27 ---
.../colocated/GridDhtColocatedLockFuture.java | 2 +
.../distributed/near/GridNearCacheAdapter.java | 10 -
.../processors/cache/local/GridLocalCache.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 27 ++-
.../cache/query/GridCacheQueryManager.java | 21 +-
.../transactions/IgniteTxLocalAdapter.java | 12 +-
.../offheap/GridOffHeapProcessor.java | 17 ++
.../util/offheap/GridOffHeapPartitionedMap.java | 9 +
.../unsafe/GridUnsafePartitionedMap.java | 155 ++++++-------
.../cache/GridCacheAbstractFullApiSelfTest.java | 227 +++++++++++--------
.../cache/GridCacheAbstractSelfTest.java | 4 +-
.../cache/OffHeapTieredTransactionSelfTest.java | 127 +++++++++++
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 43 ++++
...ionedNearDisabledOffHeapFullApiSelfTest.java | 8 +-
...DisabledOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...abledOffHeapTieredAtomicFullApiSelfTest.java | 56 +++++
...earDisabledOffHeapTieredFullApiSelfTest.java | 33 +++
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...CacheAtomicOffHeapTieredFullApiSelfTest.java | 32 +++
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...yWriteOrderOffHeapTieredFullApiSelfTest.java | 33 +++
...erOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...achePartitionedMultiNodeFullApiSelfTest.java | 15 +-
...dCachePartitionedOffHeapFullApiSelfTest.java | 8 +-
...titionedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...PartitionedOffHeapTieredFullApiSelfTest.java | 32 +++
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 72 ++++++
...idCacheReplicatedOffHeapFullApiSelfTest.java | 8 +-
...plicatedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...eReplicatedOffHeapTieredFullApiSelfTest.java | 33 +++
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...LocalAtomicOffHeapTieredFullApiSelfTest.java | 32 +++
.../GridCacheLocalOffHeapFullApiSelfTest.java | 6 +-
...dCacheLocalOffHeapTieredFullApiSelfTest.java | 32 +++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
.../IgniteCacheFullApiSelfTestSuite.java | 18 ++
...eQueryMultiThreadedOffHeapTiredSelfTest.java | 37 +++
.../IgniteCacheQueryMultiThreadedSelfTest.java | 29 ++-
.../IgniteCacheQuerySelfTestSuite.java | 1 +
44 files changed, 1231 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index afddc79..3826bfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2692,6 +2692,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void removeAll() throws IgniteCheckedException {
+ assert ctx.isLocal();
+
+ for (Iterator<KeyCacheObject> it = ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE);
+ it.hasNext(); )
+ remove((K)it.next());
+
+ for (Iterator<KeyCacheObject> it = ctx.swap().swapKeyIterator(true, true, AffinityTopologyVersion.NONE);
+ it.hasNext(); )
+ remove((K)it.next());
+
+ removeAll(keySet());
+ }
+
+ /** {@inheritDoc} */
@Override public void removeAll(final Collection<? extends K> keys) throws IgniteCheckedException {
boolean statsEnabled = ctx.config().isStatisticsEnabled();
@@ -3782,16 +3798,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException {
- return ctx.swap().lazySwapIterator();
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException {
- return ctx.swap().lazyOffHeapIterator();
- }
-
- /** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
return ctx.swap().offHeapEntriesCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 9135c16..d565af1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -751,7 +751,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
U.error(log, "Failed to evict entry from cache: " + e, ex);
}
- if (memoryMode == OFFHEAP_TIERED) {
+ if (!cctx.isNear() && memoryMode == OFFHEAP_TIERED) {
try {
evict0(cctx.cache(), e, cctx.versions().next(), null, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 5487944..55d2f84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1390,30 +1390,6 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException {
- CacheOperationContext prev = gate.enter(opCtx);
-
- try {
- return delegate.swapIterator();
- }
- finally {
- gate.leave(prev);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException {
- CacheOperationContext prev = gate.enter(opCtx);
-
- try {
- return delegate.offHeapIterator();
- }
- finally {
- gate.leave(prev);
- }
- }
-
- /** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
CacheOperationContext prev = gate.enter(opCtx);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 6444e37..eb82218 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1211,10 +1211,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
checkIteratorQueue();
if (offHeapEnabled() && !swapEnabled())
- return rawOffHeapIterator();
+ return rawOffHeapIterator(true, true);
if (swapEnabled() && !offHeapEnabled())
- return rawSwapIterator();
+ return rawSwapIterator(true, true);
// Both, swap and off-heap are enabled.
return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
@@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
private Map.Entry<byte[], byte[]> cur;
{
- it = rawOffHeapIterator();
+ it = rawOffHeapIterator(true, true);
advance();
}
@@ -1241,7 +1241,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (offheapFlag) {
offheapFlag = false;
- it = rawSwapIterator();
+ it = rawSwapIterator(true, true);
if (!it.hasNext()) {
it.close();
@@ -1313,7 +1313,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
- return new PartitionsKeyIterator(parts) {
+ return new PartitionsAbstractIterator<KeyCacheObject>(parts) {
@Override protected Iterator<KeyCacheObject> partitionIterator(int part)
throws IgniteCheckedException
{
@@ -1338,7 +1338,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
- return new PartitionsKeyIterator(parts) {
+ return new PartitionsAbstractIterator<KeyCacheObject>(parts) {
@Override protected Iterator<KeyCacheObject> partitionIterator(int part)
throws IgniteCheckedException
{
@@ -1554,37 +1554,91 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param c Key/value closure.
+ * @param primary Include primaries.
+ * @param backup Include backups.
* @return Off-heap iterator.
*/
- public <T> GridCloseableIterator<T> rawOffHeapIterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c) {
+ public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+ boolean primary,
+ boolean backup)
+ {
assert c != null;
- if (!offheapEnabled)
+ if (!offheapEnabled || (!primary && !backup))
return new GridEmptyCloseableIterator<>();
checkIteratorQueue();
- return offheap.iterator(spaceName, c);
+ if (primary && backup)
+ return offheap.iterator(spaceName, c);
+
+ AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
+
+ Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+
+ return new CloseablePartitionsIterator<T, T>(parts) {
+ @Override protected GridCloseableIterator<T> partitionIterator(int part)
+ throws IgniteCheckedException
+ {
+ return offheap.iterator(spaceName, c, part);
+ }
+ };
}
/**
+ * @param primary Include primaries.
+ * @param backup Include backups.
* @return Raw off-heap iterator.
*/
- public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator() {
- if (!offheapEnabled)
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary,
+ final boolean backup)
+ {
+ if (!offheapEnabled || (!primary && !backup))
return new GridEmptyCloseableIterator<>();
- return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
- private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName);
+ if (primary && backup)
+ return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
+ private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName);
+
+ private Map.Entry<byte[], byte[]> cur;
+
+ @Override protected Map.Entry<byte[], byte[]> onNext() {
+ return cur = it.next();
+ }
+
+ @Override protected boolean onHasNext() {
+ return it.hasNext();
+ }
+
+ @Override protected void onRemove() throws IgniteCheckedException {
+ KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
+
+ int part = cctx.affinity().partition(key);
+
+ offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ }
+
+ @Override protected void onClose() throws IgniteCheckedException {
+ it.close();
+ }
+ };
+ AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
+
+ Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+
+ return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) {
private Map.Entry<byte[], byte[]> cur;
@Override protected Map.Entry<byte[], byte[]> onNext() {
- return cur = it.next();
+ return cur = super.onNext();
}
- @Override protected boolean onHasNext() {
- return it.hasNext();
+ @Override protected GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> partitionIterator(int part)
+ throws IgniteCheckedException {
+ return offheap.iterator(spaceName, part);
}
@Override protected void onRemove() throws IgniteCheckedException {
@@ -1594,10 +1648,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
}
-
- @Override protected void onClose() throws IgniteCheckedException {
- it.close();
- }
};
}
@@ -1621,15 +1671,33 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @return Raw off-heap iterator.
+ * @param primary Include primaries.
+ * @param backup Include backups.
* @throws IgniteCheckedException If failed.
*/
- public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator() throws IgniteCheckedException {
- if (!swapEnabled)
+ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup)
+ throws IgniteCheckedException
+ {
+ if (!swapEnabled || (!primary && !backup))
return new GridEmptyCloseableIterator<>();
checkIteratorQueue();
- return swapMgr.rawIterator(spaceName);
+ if (primary && backup)
+ return swapMgr.rawIterator(spaceName);
+
+ AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
+
+ Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+ cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+
+ return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(parts) {
+ @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
+ throws IgniteCheckedException
+ {
+ return swapMgr.rawIterator(spaceName, part);
+ }
+ };
}
/**
@@ -1654,7 +1722,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
return new PartitionsIterator<K, V>(parts) {
- @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part)
+ @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part)
throws IgniteCheckedException
{
return swapMgr.rawIterator(spaceName, part);
@@ -1669,7 +1737,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Offheap entries iterator.
* @throws IgniteCheckedException If failed.
*/
- public <K, V> Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer)
+ public <K, V> Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary,
+ boolean backup,
+ AffinityTopologyVersion topVer)
throws IgniteCheckedException
{
assert primary || backup;
@@ -1684,7 +1754,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
return new PartitionsIterator<K, V>(parts) {
- @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part) {
+ @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part) {
return offheap.iterator(spaceName, part);
}
};
@@ -1884,20 +1954,46 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
*
*/
- private abstract class PartitionsIterator<K, V> implements Iterator<Cache.Entry<K, V>> {
+ private abstract class PartitionsIterator<K, V> extends PartitionsAbstractIterator<Cache.Entry<K, V>> {
+ /**
+ * @param parts Partitions
+ */
+ public PartitionsIterator(Collection<Integer> parts) {
+ super(parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Iterator<Cache.Entry<K, V>> partitionIterator(int part)
+ throws IgniteCheckedException {
+ return cacheEntryIterator(GridCacheSwapManager.this.<K, V>lazyIterator(nextPartition(part)));
+ }
+
+ /**
+ * @param part Partition.
+ * @return Iterator for given partition.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part)
+ throws IgniteCheckedException;
+ }
+
+ /**
+ *
+ */
+ private abstract class PartitionsAbstractIterator<T> implements Iterator<T> {
/** */
private Iterator<Integer> partIt;
/** */
- private Iterator<Cache.Entry<K, V>> curIt;
+ private Iterator<T> curIt;
/** */
- private Cache.Entry<K, V> next;
+ private T next;
/**
* @param parts Partitions
*/
- public PartitionsIterator(Collection<Integer> parts) {
+ public PartitionsAbstractIterator(Collection<Integer> parts) {
this.partIt = parts.iterator();
advance();
@@ -1909,11 +2005,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/** {@inheritDoc} */
- @Override public Cache.Entry<K, V> next() {
+ @Override public T next() {
if (next == null)
throw new NoSuchElementException();
- Cache.Entry<K, V> e = next;
+ T e = next;
advance();
@@ -1937,8 +2033,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = partIt.next();
try {
- curIt = cacheEntryIterator(
- GridCacheSwapManager.this.<K, V>lazyIterator(partitionIterator(part)));
+ curIt = partitionIterator(part);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -1964,58 +2059,70 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Iterator for given partition.
* @throws IgniteCheckedException If failed.
*/
- abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part)
+ abstract protected Iterator<T> partitionIterator(int part)
throws IgniteCheckedException;
}
/**
*
*/
- private abstract class PartitionsKeyIterator implements Iterator<KeyCacheObject> {
+ private abstract class CloseablePartitionsIterator<T, T1 extends T> extends GridCloseableIteratorAdapter<T> {
/** */
private Iterator<Integer> partIt;
/** */
- private Iterator<KeyCacheObject> curIt;
+ protected GridCloseableIterator<T1> curIt;
/** */
- private KeyCacheObject next;
+ protected T next;
/**
* @param parts Partitions
*/
- public PartitionsKeyIterator(Collection<Integer> parts) {
+ public CloseablePartitionsIterator(Collection<Integer> parts) {
this.partIt = parts.iterator();
- advance();
+ try {
+ advance();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/** {@inheritDoc} */
- @Override public boolean hasNext() {
+ @Override protected boolean onHasNext() {
return next != null;
}
/** {@inheritDoc} */
- @Override public KeyCacheObject next() {
- if (next == null)
- throw new NoSuchElementException();
+ @Override protected T onNext() {
+ try {
+ if (next == null)
+ throw new NoSuchElementException();
- KeyCacheObject e = next;
+ T e = next;
- advance();
+ advance();
- return e;
+ return e;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/** {@inheritDoc} */
- @Override public void remove() {
- throw new UnsupportedOperationException();
+ @Override protected void onClose() throws IgniteCheckedException {
+ if (curIt != null)
+ curIt.close();
}
/**
* Switches to next element.
+ * @throws IgniteCheckedException If failed.
*/
- private void advance() {
+ private void advance() throws IgniteCheckedException {
next = null;
do {
@@ -2038,8 +2145,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
break;
}
- else
+ else {
+ curIt.close();
+
curIt = null;
+ }
}
}
while (partIt.hasNext());
@@ -2050,7 +2160,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Iterator for given partition.
* @throws IgniteCheckedException If failed.
*/
- abstract protected Iterator<KeyCacheObject> partitionIterator(int part)
- throws IgniteCheckedException;
+ abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index fe371ce..5184115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1451,33 +1451,6 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public long swapKeys() throws IgniteCheckedException;
/**
- * Gets iterator over keys and values belonging to this cache swap space on local node. This
- * iterator is thread-safe, which means that cache (and therefore its swap space)
- * may be modified concurrently with iteration over swap.
- * <p>
- * Returned iterator supports {@code remove} operation which delegates to
- * <code>removex(Object, org.apache.ignite.lang.IgnitePredicate[])</code> method.
- *
- * @return Iterator over keys.
- * @throws IgniteCheckedException If failed.
- */
- public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException;
-
- /**
- * Gets iterator over keys and values belonging to this cache off-heap memory on local node. This
- * iterator is thread-safe, which means that cache (and therefore its off-heap memory)
- * may be modified concurrently with iteration over off-heap. To achieve better performance
- * the keys and values deserialized on demand, whenever accessed.
- * <p>
- * Returned iterator supports {@code remove} operation which delegates to
- * <code>removex(Object, org.apache.ignite.lang.IgnitePredicate[])}</code> method.
- *
- * @return Iterator over keys.
- * @throws IgniteCheckedException If failed.
- */
- public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException;
-
- /**
* Forces this cache node to re-balance its partitions. This method is usually used when
* {@link CacheConfiguration#getRebalanceDelay()} configuration parameter has non-zero value.
* When many nodes are started or stopped almost concurrently, it is more efficient to delay
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 5b74b31..f10baa3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -786,6 +786,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+
+ entry = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 29c1d45..145e980 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -436,16 +436,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
}
/** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException {
- return dht().swapIterator();
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException {
- return dht().offHeapIterator();
- }
-
- /** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
return dht().offHeapEntriesCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index fae2372..6120e25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -146,6 +146,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
try {
entry = entryExx(key);
+ entry.unswap(false);
+
if (!ctx.isAll(entry, filter)) {
fut.onFailed();
@@ -200,12 +202,6 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void removeAll() throws IgniteCheckedException {
- removeAll(keySet());
- }
-
- /** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
return ctx.closures().callLocalSafe(new Callable<Void>() {
@Override public Void call() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 936ed9d..819b0f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -387,12 +387,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void removeAll() throws IgniteCheckedException {
- removeAll(keySet());
- }
-
- /** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync() {
return ctx.closures().callLocalSafe(new Callable<Void>() {
@Override public Void call() throws Exception {
@@ -1374,16 +1368,24 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
private List<GridCacheEntryEx> lockEntries(Collection<? extends K> keys) {
List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
+ boolean nullKeys = false;
+
while (true) {
for (K key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
+ if (key == null) {
+ nullKeys = true;
+
+ break;
+ }
GridCacheEntryEx entry = entryEx(ctx.toCacheKeyObject(key));
locked.add(entry);
}
+ if (nullKeys)
+ break;
+
for (int i = 0; i < locked.size(); i++) {
GridCacheEntryEx entry = locked.get(i);
@@ -1405,6 +1407,15 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
if (!locked.isEmpty())
return locked;
}
+
+ assert nullKeys;
+
+ AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+
+ for (GridCacheEntryEx entry : locked)
+ ctx.evicts().touch(entry, topVer);
+
+ throw new NullPointerException("Null key.");
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index d3ebe60..16a8028 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -766,13 +766,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final ExpiryPolicy plc = cctx.expiry();
+ final boolean backups = qry.includeBackups() || cctx.isReplicated();
+
final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
private IgniteBiTuple<K, V> next;
private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
- private Iterator<K> iter = qry.includeBackups() || cctx.isReplicated() ?
- prj.keySet().iterator() : prj.primaryKeySet().iterator();
+ private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
{
advance();
@@ -868,10 +869,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
iters.add(heapIt);
if (cctx.isOffHeapEnabled())
- iters.add(offheapIterator(qry));
+ iters.add(offheapIterator(qry, backups));
if (cctx.swap().swapEnabled())
- iters.add(swapIterator(qry));
+ iters.add(swapIterator(qry, backups));
it = new CompoundIterator<>(iters);
}
@@ -905,32 +906,34 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
/**
* @param qry Query.
+ * @param backups Include backups.
* @return Swap iterator.
* @throws IgniteCheckedException If failed.
*/
- private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry)
+ private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, boolean backups)
throws IgniteCheckedException {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator();
+ Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups);
return scanIterator(it, filter, qry.keepPortable());
}
/**
* @param qry Query.
+ * @param backups Include backups.
* @return Offheap iterator.
*/
- private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry) {
+ private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, boolean backups) {
IgniteBiPredicate<K, V> filter = qry.scanFilter();
if (cctx.offheapTiered() && filter != null) {
OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable());
- return cctx.swap().rawOffHeapIterator(c);
+ return cctx.swap().rawOffHeapIterator(c, true, backups);
}
else {
- Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator();
+ Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups);
return scanIterator(it, filter, qry.keepPortable());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fc3efba..3c855ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -2040,7 +2040,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
for (Object key : keys) {
if (key == null) {
- setRollbackOnly();
+ rollback();
throw new NullPointerException("Null key.");
}
@@ -2191,7 +2191,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
drVer,
skipStore);
- if (!implicit() && readCommitted())
+ if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
cacheCtx.evicts().touch(entry, topologyVersion());
if (groupLock() && !lockOnly)
@@ -2934,19 +2934,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
assert loadFut.isDone();
return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
- @Override
- public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
throws IgniteCheckedException {
txFut.get();
- return (GridCacheReturn)implicitRes;
+ return implicitRes;
}
}));
}
else
return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
- @Override
- public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
throws IgniteCheckedException {
f.get();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index ebedadb..a99c4c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -293,6 +293,23 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
}
/**
+ * Gets iterator over contents of the given space.
+ *
+ * @param spaceName Space name.
+ * @param c Key/value closure.
+ * @param part Partition.
+ * @return Iterator.
+ */
+ public <T> GridCloseableIterator<T> iterator(@Nullable String spaceName,
+ CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, int part) {
+ assert c != null;
+
+ GridOffHeapPartitionedMap m = offheap(spaceName);
+
+ return m == null ? new GridEmptyCloseableIterator<T>() : m.iterator(c, part);
+ }
+
+ /**
* Gets number of elements in the given space.
*
* @param spaceName Space name. Optional.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
index 49850ab..a945262 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
@@ -201,6 +201,15 @@ public interface GridOffHeapPartitionedMap {
public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c);
/**
+ * Gets iterator over the partition.
+ *
+ * @param c Key/value closure.
+ * @param part Partition.
+ * @return Iterator over the partition.
+ */
+ public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, int part);
+
+ /**
* Gets iterator over certain partition.
*
* @param p Partition.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
index ba67b30..4ffc33f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
@@ -277,21 +277,8 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
/** {@inheritDoc} */
@Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator() {
- return new GridCloseableIteratorAdapter<IgniteBiTuple<byte[], byte[]>>() {
- private int p;
-
- private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> curIt;
-
- {
- try {
- advance();
- }
- catch (IgniteCheckedException e) {
- e.printStackTrace(); // Should never happen.
- }
- }
-
- private void advance() throws IgniteCheckedException {
+ return new PartitionedMapCloseableIterator<IgniteBiTuple<byte[], byte[]>>() {
+ protected void advance() throws IgniteCheckedException {
curIt = null;
while (p < parts) {
@@ -305,34 +292,6 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
curIt = null;
}
-
- @Override protected IgniteBiTuple<byte[], byte[]> onNext() throws IgniteCheckedException {
- if (curIt == null)
- throw new NoSuchElementException();
-
- IgniteBiTuple<byte[], byte[]> t = curIt.next();
-
- if (!curIt.hasNext()) {
- curIt.close();
-
- advance();
- }
-
- return t;
- }
-
- @Override protected boolean onHasNext() {
- return curIt != null;
- }
-
- @Override protected void onRemove() {
- throw new UnsupportedOperationException();
- }
-
- @Override protected void onClose() throws IgniteCheckedException {
- if (curIt != null)
- curIt.close();
- }
};
}
@@ -340,21 +299,8 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
@Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c) {
assert c != null;
- return new GridCloseableIteratorAdapter<T>() {
- private int p;
-
- private GridCloseableIterator<T> curIt;
-
- {
- try {
- advance();
- }
- catch (IgniteCheckedException e) {
- e.printStackTrace(); // Should never happen.
- }
- }
-
- private void advance() throws IgniteCheckedException {
+ return new PartitionedMapCloseableIterator<T>() {
+ protected void advance() throws IgniteCheckedException {
curIt = null;
while (p < parts) {
@@ -368,38 +314,16 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
curIt = null;
}
-
- @Override protected T onNext() throws IgniteCheckedException {
- if (curIt == null)
- throw new NoSuchElementException();
-
- T t = curIt.next();
-
- if (!curIt.hasNext()) {
- curIt.close();
-
- advance();
- }
-
- return t;
- }
-
- @Override protected boolean onHasNext() {
- return curIt != null;
- }
-
- @Override protected void onRemove() {
- throw new UnsupportedOperationException();
- }
-
- @Override protected void onClose() throws IgniteCheckedException {
- if (curIt != null)
- curIt.close();
- }
};
}
/** {@inheritDoc} */
+ @Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+ int part) {
+ return mapFor(part).iterator(c);
+ }
+
+ /** {@inheritDoc} */
@Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(int p) {
return mapFor(p).iterator();
}
@@ -430,4 +354,63 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
public long lruSize() {
return lru.size();
}
+
+ /**
+ * Partitioned closable iterator.
+ */
+ private abstract class PartitionedMapCloseableIterator<T> extends GridCloseableIteratorAdapter<T> {
+ /** Current partition. */
+ protected int p;
+
+ /** Current iterator. */
+ protected GridCloseableIterator<T> curIt;
+
+ {
+ try {
+ advance();
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace(); // Should never happen.
+ }
+ }
+
+ /**
+ * Switch to next partition.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract void advance() throws IgniteCheckedException;
+
+ /** {@inheritDoc} */
+ @Override protected T onNext() throws IgniteCheckedException {
+ if (curIt == null)
+ throw new NoSuchElementException();
+
+ T t = curIt.next();
+
+ if (!curIt.hasNext()) {
+ curIt.close();
+
+ advance();
+ }
+
+ return t;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onHasNext() {
+ return curIt != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onRemove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws IgniteCheckedException {
+ if (curIt != null)
+ curIt.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 4dc371c..70d8f9c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -50,6 +50,7 @@ import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CachePeekMode.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.testframework.GridTestUtils.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
@@ -121,15 +122,15 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @return {@code True} if values should be stored off-heap.
*/
- protected boolean offHeapValues() {
- return false;
+ protected CacheMemoryMode memoryMode() {
+ return ONHEAP_TIERED;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- if (offHeapValues())
+ if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES)
cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
return cfg;
@@ -139,8 +140,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
CacheConfiguration ccfg = super.cacheConfiguration(gridName);
- if (offHeapValues()) {
- ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_VALUES);
+ if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES) {
+ ccfg.setMemoryMode(memoryMode());
ccfg.setOffHeapMaxMemory(0);
}
@@ -272,7 +273,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
// Will actually delete entry from map.
CU.invalidate(jcache(i), "key0");
- assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", CachePeekMode.ONHEAP));
+ assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", ONHEAP));
Collection<String> keysCol = mapped.get(grid(i).localNode());
@@ -288,20 +289,20 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
sum++;
- assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(CachePeekMode.ALL));
+ assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL));
}
for (int i = 0; i < gridCount(); i++) {
Collection<String> keysCol = mapped.get(grid(i).localNode());
assertEquals("Failed check for grid: " + i, !F.isEmpty(keysCol) ? keysCol.size() : 0,
- jcache(i).localSize(CachePeekMode.PRIMARY));
+ jcache(i).localSize(PRIMARY));
}
int globalPrimarySize = map.size();
for (int i = 0; i < gridCount(); i++)
- assertEquals(globalPrimarySize, jcache(i).size(CachePeekMode.PRIMARY));
+ assertEquals(globalPrimarySize, jcache(i).size(PRIMARY));
int times = 1;
@@ -313,7 +314,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
int globalSize = globalPrimarySize * times;
for (int i = 0; i < gridCount(); i++)
- assertEquals(globalSize, jcache(i).size(CachePeekMode.ALL));
+ assertEquals(globalSize, jcache(i).size(ALL));
}
/**
@@ -735,7 +736,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));
cache.remove("key1");
cache.put("key2", 1);
@@ -750,7 +751,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
@@ -789,7 +790,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));
cache.remove("key1");
cache.put("key2", 1);
@@ -804,7 +805,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
@@ -874,9 +875,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR);
for (int i = 0; i < gridCount(); i++) {
- assertNull(jcache(i).localPeek("key1", CachePeekMode.ONHEAP));
- assertNull(jcache(i).localPeek("key2", CachePeekMode.ONHEAP));
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key1", ONHEAP));
+ assertNull(jcache(i).localPeek("key2", ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
assertEquals("null", res.get("key1").get());
@@ -1232,7 +1233,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("key3"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("key3", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("key3", ONHEAP));
}
/**
@@ -1271,7 +1272,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertNull(cache.get("k1"));
for (int i = 0; i < gridCount(); i++)
- assertNull(jcache(i).localPeek("k1", CachePeekMode.ONHEAP));
+ assertNull(jcache(i).localPeek("k1", ONHEAP));
final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
@Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
@@ -1692,7 +1693,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < gridCount(); i++) {
info("Peek on node [i=" + i + ", id=" + grid(i).localNode().id() + ", val=" +
- grid(i).cache(null).localPeek("key", CachePeekMode.ONHEAP) + ']');
+ grid(i).cache(null).localPeek("key", ONHEAP) + ']');
}
assertEquals((Integer)1, cache.getAndPutIfAbsent("key", 2));
@@ -2001,10 +2002,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < gridCount(); i++) {
info("Peek key on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
- ", peekVal=" + grid(i).cache(null).localPeek("key", CachePeekMode.ONHEAP) + ']');
+ ", peekVal=" + grid(i).cache(null).localPeek("key", ONHEAP) + ']');
info("Peek key2 on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
- ", peekVal=" + grid(i).cache(null).localPeek("key2", CachePeekMode.ONHEAP) + ']');
+ ", peekVal=" + grid(i).cache(null).localPeek("key2", ONHEAP) + ']');
}
assertEquals((Integer)6, cache.get("key2"));
@@ -2233,7 +2234,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testDeletedEntriesFlag() throws Exception {
- if (cacheMode() != LOCAL && cacheMode() != REPLICATED) {
+ if (cacheMode() != LOCAL && cacheMode() != REPLICATED && memoryMode() != OFFHEAP_TIERED) {
int cnt = 3;
IgniteCache<String, Integer> cache = jcache();
@@ -2288,9 +2289,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
String key = String.valueOf(i);
if (grid(0).affinity(null).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode()))
- assertEquals((Integer)i, jcache(g).localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals((Integer)i, peek(jcache(g), key));
else
- assertNull(jcache(g).localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(peek(jcache(g), key));
}
}
}
@@ -2475,6 +2476,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
}, NullPointerException.class, null);
+ assertEquals(0, grid(0).cache(null).localSize());
+
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.removeAll(null);
@@ -2569,7 +2572,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Set<String> keys = new HashSet<>(primaryKeysForCache(cache, 2));
for (String key : keys)
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(cache.localPeek(key, ONHEAP));
Map<String, Integer> vals = new HashMap<>();
@@ -2584,17 +2587,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
for (String key : keys)
- assertEquals(vals.get(key), cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals(vals.get(key), peek(cache, key));
cache.clear();
for (String key : keys)
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(peek(cache, key));
loadAll(cache, keys, true);
for (String key : keys)
- assertEquals(vals.get(key), cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals(vals.get(key), peek(cache, key));
}
/**
@@ -2703,7 +2706,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
try {
cache.clear();
- assertEquals(vals.get(first), peek(cache, first));
+ assertEquals(vals.get(first), cache.localPeek(first, ONHEAP));
}
finally {
lock.unlock();
@@ -2734,14 +2737,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.localEvict(Sets.union(ImmutableSet.of("key1", "key2"), keys));
- assert cache.localSize(CachePeekMode.ONHEAP) == 0;
+ assert cache.localSize(ONHEAP) == 0;
cache.clear();
cache.localPromote(ImmutableSet.of("key2", "key1"));
- assert cache.localPeek("key1", CachePeekMode.ONHEAP) == null;
- assert cache.localPeek("key2", CachePeekMode.ONHEAP) == null;
+ assert cache.localPeek("key1", ONHEAP) == null;
+ assert cache.localPeek("key2", ONHEAP) == null;
}
/**
@@ -2906,13 +2909,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Ignite ignite = primaryIgnite("key");
IgniteCache<String, Integer> cache = ignite.cache(null);
- assert cache.localPeek("key", CachePeekMode.ONHEAP) == null;
+ assert peek(cache, "key") == null;
cache.put("key", 1);
cache.replace("key", 2);
- assert cache.localPeek("key", CachePeekMode.ONHEAP) == 2;
+ assertEquals(2, peek(cache, "key").intValue());
}
/**
@@ -2944,7 +2947,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.remove("key");
assertNull(cache.get("key")); // localPeek ignores transactions.
- assertNotNull(cache.localPeek("key")); // localPeek ignores transactions.
+ assertNotNull(peek(cache, "key")); // localPeek ignores transactions.
tx.commit();
}
@@ -2960,7 +2963,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.put("key", 1);
cache.remove("key");
- assertNull(cache.localPeek("key", CachePeekMode.ONHEAP));
+ assertNull(peek(cache, "key"));
}
/**
@@ -2986,11 +2989,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
// Expired entry should not be swapped.
cache.localEvict(Collections.singleton(key));
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(peek(cache, "key"));
cache.localPromote(Collections.singleton(key));
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(cache.localPeek(key, ONHEAP));
assertTrue(cache.localSize() == 0);
@@ -3021,7 +3024,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
c.put(key, 1);
- assertEquals(Integer.valueOf(1), c.localPeek(key, CachePeekMode.ONHEAP));
+ assertEquals(Integer.valueOf(1), peek(c, key));
int ttl = 500;
@@ -3031,7 +3034,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Thread.sleep(ttl + 100);
- assert c.localPeek(key, CachePeekMode.ONHEAP) == null;
+ assert peek(c, key) == null;
assert c.localSize() == 0 : "Cache is not empty.";
}
@@ -3058,7 +3061,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Thread.sleep(ttl + 100);
- assertNull(c.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(c.localPeek(key, ONHEAP));
assert c.localSize() == 0;
}
@@ -3092,6 +3095,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
private void checkTtl(boolean inTx, boolean oldEntry) throws Exception {
+ if (memoryMode() == OFFHEAP_TIERED)
+ return;
+
int ttl = 1000;
final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));
@@ -3361,15 +3367,15 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
cache.put(key2, 2);
cache.put(key3, 3);
- assert cache.localPeek(key1, CachePeekMode.ONHEAP) == 1;
- assert cache.localPeek(key2, CachePeekMode.ONHEAP) == 2;
- assert cache.localPeek(key3, CachePeekMode.ONHEAP) == 3;
+ assert peek(cache, key1) == 1;
+ assert peek(cache, key2) == 2;
+ assert peek(cache, key3) == 3;
cache.localEvict(F.asList(key1, key2));
- assert cache.localPeek(key1, CachePeekMode.ONHEAP) == null;
- assert cache.localPeek(key2, CachePeekMode.ONHEAP) == null;
- assert cache.localPeek(key3, CachePeekMode.ONHEAP) == 3;
+ assert cache.localPeek(key1, ONHEAP) == null;
+ assert cache.localPeek(key2, ONHEAP) == null;
+ assert peek(cache, key3) == 3;
loadAll(cache, ImmutableSet.of(key1, key2), true);
@@ -3391,7 +3397,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testUnswap() throws Exception {
- GridCacheAdapter<String, Integer> cache = ((IgniteKernal)grid(0)).internalCache();
+ IgniteCache<String, Integer> cache = grid(0).cache(null);
List<String> keys = primaryKeysForCache(jcache(), 3);
@@ -3408,17 +3414,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Collection<String> locKeys = new HashSet<>();
- if (cache.context().affinityNode()) {
- locKeys.addAll(cache.primaryKeySet());
+ if (grid(0).context().cache().cache(null).context().affinityNode()) {
+ Iterable<Cache.Entry<String, Integer>> entries = cache.localEntries(PRIMARY, BACKUP);
- info("Local keys (primary): " + locKeys);
-
- locKeys.addAll(cache.keySet(new CacheEntryPredicateAdapter() {
- @Override public boolean apply(GridCacheEntryEx e) {
- return grid(0).affinity(null).isBackup(grid(0).localNode(),
- e.key().value(e.context().cacheObjectContext(), false));
- }
- }));
+ for (Cache.Entry<String, Integer> entry : entries)
+ locKeys.add(entry.getKey());
info("Local keys (primary + backup): " + locKeys);
}
@@ -3444,57 +3444,67 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
}
- cache.evictAll(Collections.singleton(k2));
- cache.evictAll(Collections.singleton(k3));
+ cache.localEvict(F.asList(k2, k3));
- assertNotNull(cache.localPeek(k1, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k2, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k3, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
+ if (memoryMode() == OFFHEAP_TIERED) {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
+ else {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
int cnt = 0;
if (locKeys.contains(k2)) {
- assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES));
- cache.promoteAll(Collections.singleton(k2));
+ cache.localPromote(Collections.singleton(k2));
- assertEquals((Integer) 2, cache.localPeek(k2, ONHEAP_PEEK_MODES, null));
+ assertEquals((Integer) 2, cache.localPeek(k2, ONHEAP_PEEK_MODES));
cnt++;
}
else {
- cache.promoteAll(Collections.singleton(k2));
+ cache.localPromote(Collections.singleton(k2));
- assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k2, ONHEAP_PEEK_MODES));
}
if (locKeys.contains(k3)) {
- assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES));
- cache.promoteAll(Collections.singleton(k3));
+ cache.localPromote(Collections.singleton(k3));
- assertEquals((Integer)3, cache.localPeek(k3, ONHEAP_PEEK_MODES, null));
+ assertEquals((Integer)3, cache.localPeek(k3, ONHEAP_PEEK_MODES));
cnt++;
}
else {
- cache.promoteAll(Collections.singleton(k3));
+ cache.localPromote(Collections.singleton(k3));
- assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES, null));
+ assertNull(cache.localPeek(k3, ONHEAP_PEEK_MODES));
}
- assertEquals(cnt, swapEvts.get());
- assertEquals(cnt, unswapEvts.get());
+ if (memoryMode() != OFFHEAP_TIERED) {
+ assertEquals(cnt, swapEvts.get());
+ assertEquals(cnt, unswapEvts.get());
+ }
- cache.evictAll(Collections.singleton(k1));
+ cache.localEvict(Collections.singleton(k1));
assertEquals((Integer)1, cache.get(k1));
if (locKeys.contains(k1))
cnt++;
- assertEquals(cnt, swapEvts.get());
- assertEquals(cnt, unswapEvts.get());
+ if (memoryMode() != OFFHEAP_TIERED) {
+ assertEquals(cnt, swapEvts.get());
+ assertEquals(cnt, unswapEvts.get());
+ }
cache.clear();
@@ -3506,14 +3516,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
swapEvts.set(0);
unswapEvts.set(0);
- cache.evictAll(Collections.singleton(k2));
- cache.evictAll(Collections.singleton(k3));
+ cache.localEvict(Collections.singleton(k2));
+ cache.localEvict(Collections.singleton(k3));
- assertNotNull(cache.localPeek(k1, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k2, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
- assertNull(cache.localPeek(k3, new CachePeekMode[] {CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP}, null));
+ if (memoryMode() == OFFHEAP_TIERED) {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNotNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
+ else {
+ assertNotNull(cache.localPeek(k1, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k2, ONHEAP, OFFHEAP));
+ assertNull(cache.localPeek(k3, ONHEAP, OFFHEAP));
+ }
- cache.promoteAll(F.asList(k2, k3));
+ cache.localPromote(F.asSet(k2, k3));
cnt = 0;
@@ -3523,8 +3540,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (locKeys.contains(k3))
cnt++;
- assertEquals(cnt, swapEvts.get());
- assertEquals(cnt, unswapEvts.get());
+ if (memoryMode() != OFFHEAP_TIERED) {
+ assertEquals(cnt, swapEvts.get());
+ assertEquals(cnt, unswapEvts.get());
+ }
}
/**
@@ -3557,7 +3576,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
Thread.sleep(ttl + 100);
// Peek will actually remove entry from cache.
- assertNull(cache.localPeek(key, CachePeekMode.ONHEAP));
+ assertNull(cache.localPeek(key, ONHEAP));
assert cache.localSize() == 0;
@@ -3654,6 +3673,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertTrue(cache.remove("key" + i));
}
});
+
+ CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
+ @Override public void applyx(IgniteCache<String, Integer> cache) {
+ for (int i = 0; i < cnt; i++)
+ assertNull(cache.get("key" + i));
+ }
+ });
}
}
@@ -3745,6 +3771,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
protected void checkSize(Collection<String> keys) throws Exception {
+ if (memoryMode() == OFFHEAP_TIERED)
+ return;
+
if (nearEnabled())
assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
else {
@@ -3768,7 +3797,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
}
- assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(CachePeekMode.ALL));
+ assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL));
}
}
}
@@ -3779,8 +3808,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
protected void checkKeySize(Collection<String> keys) throws Exception {
if (nearEnabled())
- assertEquals("Invalid key size: " + jcache().localSize(CachePeekMode.ALL),
- keys.size(), jcache().localSize(CachePeekMode.ALL));
+ assertEquals("Invalid key size: " + jcache().localSize(ALL),
+ keys.size(), jcache().localSize(ALL));
else {
for (int i = 0; i < gridCount(); i++) {
GridCacheContext<String, Integer> ctx = context(i);
@@ -3791,7 +3820,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
size++;
- assertEquals("Incorrect key size on cache #" + i, size, jcache(i).localSize(CachePeekMode.ALL));
+ assertEquals("Incorrect key size on cache #" + i, size, jcache(i).localSize(ALL));
}
}
}
@@ -4390,7 +4419,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertFalse(cacheSkipStore.iterator().hasNext());
assertTrue(map.size() == 0);
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
// putAll/removeAll from multiple nodes.
@@ -4479,8 +4508,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertTrue(map.containsKey(rmvKey));
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
- assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
+ assertTrue(cacheSkipStore.size(ALL) == 0);
cache.remove(rmvKey);
@@ -4723,8 +4752,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
for (int i = 0; i < keys.size(); i++)
putToStore(keys.get(i), i);
- assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0);
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
+ assertTrue(cacheSkipStore.size(ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
assertTrue(map.size() != 0);
try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
@@ -4813,8 +4842,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
*/
private void checkEmpty(IgniteCache<String, Integer> cache, IgniteCache<String, Integer> cacheSkipStore)
throws Exception {
- assertTrue(cache.size(CachePeekMode.ALL) == 0);
- assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0);
+ assertTrue(cache.size(ALL) == 0);
+ assertTrue(cacheSkipStore.size(ALL) == 0);
assertTrue(map.size() == 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 342eb5a..efd0185 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -393,8 +393,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws Exception {
- return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP) : cache.localPeek(key,
- CachePeekMode.ONHEAP);
+ return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, CachePeekMode.OFFHEAP) :
+ cache.localPeek(key, CachePeekMode.ONHEAP);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
new file mode 100644
index 0000000..671d6c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Off-heap tiered test.
+ */
+public class OffHeapTieredTransactionSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setMemoryMode(OFFHEAP_TIERED);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setOffHeapMaxMemory(0);
+ ccfg.setSwapEnabled(true);
+ ccfg.setCacheMode(REPLICATED);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 30_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testPutAll() throws Exception {
+ IgniteCache<String, Integer> cache = grid(0).cache(null);
+
+ final int KEYS = 5;
+
+ Map<String, Integer> data = new LinkedHashMap<>();
+
+ for (int i = 0; i < KEYS; i++)
+ data.put("key_" + i, i);
+
+ checkPutAll(cache, data, OPTIMISTIC, READ_COMMITTED);
+
+ checkPutAll(cache, data, OPTIMISTIC, REPEATABLE_READ);
+
+ checkPutAll(cache, data, OPTIMISTIC, SERIALIZABLE);
+
+ checkPutAll(cache, data, PESSIMISTIC, READ_COMMITTED);
+
+ checkPutAll(cache, data, PESSIMISTIC, REPEATABLE_READ);
+
+ checkPutAll(cache, data, PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ private void checkPutAll(IgniteCache<String, Integer> cache, Map<String, Integer> data,
+ TransactionConcurrency txConcurrency, TransactionIsolation txIsolation) throws Exception {
+ IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+ try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
+ cache.putAll(data);
+
+ tx.commit();
+ }
+
+ for (Map.Entry<String, Integer> entry : data.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..686cc31
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Tests colocated cache with off-heap tiered mode.
+ */
+public class GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest extends
+ GridCachePartitionedNearDisabledOffHeapTieredMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean txEnabled() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean lockingEnabled() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/745cf7f9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledOffHeapFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledOffHeapFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledOffHeapFullApiSelfTest.java
index c3a69e2..2dd07ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledOffHeapFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledOffHeapFullApiSelfTest.java
@@ -17,13 +17,17 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+
/**
* Tests colocated cache with values being stored off-heap.
*/
public class GridCachePartitionedNearDisabledOffHeapFullApiSelfTest extends
GridCachePartitionedNearDisabledFullApiSelfTest {
/** {@inheritDoc} */
- @Override protected boolean offHeapValues() {
- return true;
+ @Override protected CacheMemoryMode memoryMode() {
+ return OFFHEAP_VALUES;
}
}