You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/06 14:39:38 UTC
[16/50] [abbrv] ignite git commit: IGNITE-3227 - Added method to get
partition size
IGNITE-3227 - Added method to get partition size
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8af30781
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8af30781
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8af30781
Branch: refs/heads/ignite-1.5.31-1
Commit: 8af307819dba9f9fea4946cb09df01c4ef146f8a
Parents: 5b49dad
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Jul 7 15:58:50 2016 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Jul 7 15:58:50 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 25 +
.../processors/cache/GridCacheAdapter.java | 215 ++++++++-
.../processors/cache/GridCacheProxyImpl.java | 36 ++
.../processors/cache/IgniteCacheProxy.java | 40 ++
.../processors/cache/IgniteInternalCache.java | 23 +
.../cache/IgniteCacheAtomicPeekModesTest.java | 2 +-
.../cache/IgniteCachePeekModesAbstractTest.java | 463 ++++++++++++++++++-
.../multijvm/IgniteCacheProcessProxy.java | 46 +-
8 files changed, 846 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 3af2c44..8cefb4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -370,6 +370,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
public long sizeLong(CachePeekMode... peekModes) throws CacheException;
/**
+ * Gets the number of all entries cached in a partition as a long value. By default, if {@code peekModes} value
+ * isn't defined, only size of primary copies across all nodes will be returned. This behavior is identical to
+ * calling this method with {@link CachePeekMode#PRIMARY} peek mode.
+ * <p>
+ * NOTE: this operation is distributed and will query all participating nodes for their partition cache sizes.
+ *
+ * @param partition partition.
+ * @param peekModes Optional peek modes. If not provided, then total partition cache size is returned.
+ * @return Partion cache size across all nodes.
+ */
+ @IgniteAsyncSupported
+ public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException;
+
+ /**
* Gets the number of all entries cached on this node. By default, if {@code peekModes} value isn't defined,
* only size of primary copies will be returned. This behavior is identical to calling this method with
* {@link CachePeekMode#PRIMARY} peek mode.
@@ -390,6 +404,17 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
public long localSizeLong(CachePeekMode... peekModes);
/**
+ * Gets the number of all entries cached on this node for the partition as a long value. By default, if {@code peekModes} value isn't
+ * defined, only size of primary copies will be returned. This behavior is identical to calling this method with
+ * {@link CachePeekMode#PRIMARY} peek mode.
+ *
+ * @param partition partition.
+ * @param peekModes Optional peek modes. If not provided, then total cache size is returned.
+ * @return Cache size on this node.
+ */
+ public long localSizeLong(int partition, CachePeekMode... peekModes);
+
+ /**
* @param map Map containing keys and entry processors to be applied to values.
* @param args Additional arguments to pass to the {@link EntryProcessor}.
* @return The map of {@link EntryProcessorResult}s of the processing per key,
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 028f516..55bd81d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -3965,6 +3966,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
+ if (isLocal())
+ return localSizeLong(partition, peekModes);
+
+ return sizeLongAsync(partition, peekModes).get();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) {
assert peekModes != null;
@@ -4007,6 +4016,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Long> sizeLongAsync(final int part, final CachePeekMode[] peekModes) {
+ assert peekModes != null;
+
+ final PeekModes modes = parsePeekModes(peekModes, true);
+
+ IgniteClusterEx cluster = ctx.grid().cluster();
+ final GridCacheAffinityManager aff = ctx.affinity();
+ final AffinityTopologyVersion topVer = aff.affinityTopologyVersion();
+
+ ClusterGroup grp = cluster.forDataNodes(name());
+
+ Collection<ClusterNode> nodes = grp.forPredicate(new IgnitePredicate<ClusterNode>() {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ return clusterNode.version().compareTo(PartitionSizeLongTask.SINCE_VER) >= 0 &&
+ ((modes.primary && aff.primary(clusterNode, part, topVer)) ||
+ (modes.backup && aff.backup(clusterNode, part, topVer)));
+ }
+ }).nodes();
+
+ if (nodes.isEmpty())
+ return new GridFinishedFuture<>(0L);
+
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
+
+ return ctx.kernalContext().task().execute(
+ new PartitionSizeLongTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, part), null);
+ }
+
+ /** {@inheritDoc} */
@Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException {
return (int)localSizeLong(peekModes);
}
@@ -4060,6 +4099,50 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
+ @Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException {
+ PeekModes modes = parsePeekModes(peekModes, true);
+
+ long size = 0;
+
+ AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+
+ // Swap and offheap are disabled for near cache.
+ GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
+
+ if (ctx.isLocal()){
+ modes.primary = true;
+ modes.backup = true;
+
+ if (modes.heap)
+ size += size();
+
+ if (modes.swap)
+ size += swapMgr.swapEntriesCount(0);
+
+ if (modes.offheap)
+ size += swapMgr.offheapEntriesCount(0);
+ }
+ else {
+ GridDhtLocalPartition dhtPart = ctx.topology().localPartition(part, topVer, false);
+
+ if (dhtPart != null) {
+ if (modes.primary && dhtPart.primary(topVer) || modes.backup && dhtPart.backup(topVer)) {
+ if (modes.heap)
+ size += dhtPart.publicSize();
+
+ if (modes.swap)
+ size += swapMgr.swapEntriesCount(part);
+
+ if (modes.offheap)
+ size += swapMgr.offheapEntriesCount(part);
+ }
+ }
+ }
+
+ return size;
+ }
+
+ /** {@inheritDoc} */
@Override public int size() {
return map.publicSize();
}
@@ -5637,6 +5720,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Internal callable for partition size calculation.
+ */
+ @GridInternal
+ private static class PartitionSizeLongJob extends TopologyVersionAwareJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Partition. */
+ private final int partition;
+
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /**
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ * @param peekModes Cache peek modes.
+ * @param partition partition.
+ */
+ private PartitionSizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, int partition) {
+ super(cacheName, topVer);
+
+ this.peekModes = peekModes;
+ this.partition = partition;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+ if (cache == null)
+ return 0;
+
+ try {
+ return cache.localSizeLong(partition, peekModes);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(PartitionSizeLongJob.class, this);
+ }
+ }
+
+ /**
* Internal callable for global size calculation.
*/
@GridInternal
@@ -6610,7 +6739,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param topVer Affinity topology version.
* @param peekModes Cache peek modes.
*/
- public SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
+ private SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
this.cacheName = cacheName;
this.topVer = topVer;
this.peekModes = peekModes;
@@ -6655,6 +6784,90 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Partition Size Long task.
+ */
+ private static class PartitionSizeLongTask extends ComputeTaskAdapter<Object, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.30");
+
+ /** Partition */
+ private final int partition;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /**
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ * @param peekModes Cache peek modes.
+ * @param partition partition.
+ */
+ private PartitionSizeLongTask(
+ String cacheName,
+ AffinityTopologyVersion topVer,
+ CachePeekMode[] peekModes,
+ int partition
+ ) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ this.peekModes = peekModes;
+ this.partition = partition;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ @Nullable Object arg
+ ) throws IgniteException {
+ Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new PartitionSizeLongJob(cacheName, topVer, peekModes, partition), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Long reduce(List<ComputeJobResult> results) throws IgniteException {
+ long size = 0;
+
+ for (ComputeJobResult res : results) {
+ if (res != null) {
+ if (res.getException() == null)
+ size += res.<Long>getData();
+ else
+ throw res.getException();
+ }
+ }
+
+ return size;
+ }
+ }
+
+ /**
* Clear task.
*/
private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 99dd608..b46c4dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1483,6 +1483,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.sizeLong(partition, peekModes);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) {
CacheOperationContext prev = gate.enter(opCtx);
@@ -1507,6 +1519,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Long> sizeLongAsync(int partition, CachePeekMode[] peekModes) {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.sizeLongAsync(partition, peekModes);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException {
CacheOperationContext prev = gate.enter(opCtx);
@@ -1531,6 +1555,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
+ @Override public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ return delegate.localSizeLong(partition, peekModes);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public int nearSize() {
CacheOperationContext prev = gate.enter(opCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 12ec8b8..92e59db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -840,6 +840,29 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
/** {@inheritDoc} */
+ @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ if (isAsync()) {
+ setFuture(delegate.sizeLongAsync(part, peekModes));
+
+ return 0;
+ }
+ else
+ return delegate.sizeLong(part, peekModes);
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public int localSize(CachePeekMode... peekModes) {
GridCacheGateway<K, V> gate = this.gate;
@@ -874,6 +897,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public long localSizeLong(int part, CachePeekMode... peekModes) {
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ return delegate.localSizeLong(part, peekModes);
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public V get(K key) {
try {
GridCacheGateway<K, V> gate = this.gate;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index d155b0e..4dc9a23f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1453,6 +1453,14 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public long localSizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException;
/**
+ * @param partition partition.
+ * @param peekModes Peek modes.
+ * @return Local cache size as a long value.
+ * @throws IgniteCheckedException If failed.
+ */
+ public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException;
+
+ /**
* @param peekModes Peek modes.
* @return Global cache size.
* @throws IgniteCheckedException If failed.
@@ -1467,6 +1475,14 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public long sizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException;
/**
+ * @param partition partition
+ * @param peekModes Peek modes.
+ * @return Global cache size as a long value.
+ * @throws IgniteCheckedException If failed.
+ */
+ public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException;
+
+ /**
* @param peekModes Peek modes.
* @return Future.
*/
@@ -1479,6 +1495,13 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
public IgniteInternalFuture<Long> sizeLongAsync(CachePeekMode[] peekModes);
/**
+ * @param partition partiton
+ * @param peekModes Peek modes.
+ * @return Future.
+ */
+ public IgniteInternalFuture<Long> sizeLongAsync(int partition, CachePeekMode[] peekModes);
+
+ /**
* Gets size of near cache key set. This method will return count of all entries in near
* cache and has O(1) complexity on base cache projection.
* <p>
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
index 4270bab..8b7859a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
@@ -48,4 +48,4 @@ public class IgniteCacheAtomicPeekModesTest extends IgniteCachePeekModesAbstract
@Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
return PRIMARY;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
index c27cccb..5dc059b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -38,6 +39,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
@@ -501,6 +503,144 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
}
/**
+ * @throws Exception If failed.
+ */
+ public void testLocalPartitionSize() throws Exception {
+ if (cacheMode() != LOCAL)
+ return;
+
+ awaitPartitionMapExchange();
+ checkEmpty();
+ int part = 0;
+ IgniteCache<Integer, String> cache0 = jcache(0);
+
+ IgniteCache<Integer, String> cacheAsync0 = cache0.withAsync();
+
+ for (int i = 0; i < HEAP_ENTRIES; i++) {
+ cache0.put(i, String.valueOf(i));
+
+ final long size = i + 1;
+
+ assertEquals(size, cache0.localSize());
+ assertEquals(size, cache0.localSizeLong(part, PRIMARY));
+ assertEquals(size, cache0.localSizeLong(part, BACKUP));
+ assertEquals(size, cache0.localSizeLong(part, NEAR));
+ assertEquals(size, cache0.localSizeLong(part, ALL));
+
+ assertEquals(size, cache0.size());
+ assertEquals(size, cache0.sizeLong(part, PRIMARY));
+ assertEquals(size, cache0.sizeLong(part, BACKUP));
+ assertEquals(size, cache0.sizeLong(part, NEAR));
+ assertEquals(size, cache0.sizeLong(part, ALL));
+
+ cacheAsync0.size();
+
+ assertEquals(size, (long) cacheAsync0.<Integer>future().get());
+
+ cacheAsync0.sizeLong(part, PRIMARY);
+
+ assertEquals(size, cacheAsync0.future().get());
+ }
+
+ for (int i = 0; i < HEAP_ENTRIES; i++) {
+ cache0.remove(i, String.valueOf(i));
+
+ final int size = HEAP_ENTRIES - i - 1;
+
+ assertEquals(size, cache0.localSize());
+ assertEquals(size, cache0.localSizeLong(part, PRIMARY));
+ assertEquals(size, cache0.localSizeLong(part, BACKUP));
+ assertEquals(size, cache0.localSizeLong(part, NEAR));
+ assertEquals(size, cache0.localSizeLong(part, ALL));
+
+ assertEquals(size, cache0.size());
+ assertEquals(size, cache0.sizeLong(part, PRIMARY));
+ assertEquals(size, cache0.sizeLong(part, BACKUP));
+ assertEquals(size, cache0.sizeLong(part, NEAR));
+ assertEquals(size, cache0.sizeLong(part, ALL));
+
+ cacheAsync0.size();
+
+ assertEquals(size, (long) cacheAsync0.<Integer>future().get());
+ }
+ }
+
+ /**
+ * @throws InterruptedException If failed.
+ */
+ public void testLocalPartitionSizeFlags() throws InterruptedException {
+ if (cacheMode() != LOCAL)
+ return;
+
+ awaitPartitionMapExchange();
+ checkEmpty();
+ int part = 0;
+ IgniteCache<Integer, String> cache0 = jcache(0);
+
+ Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < 200; i++) {
+ cache0.put(i, "test_val");
+
+ keys.add(i);
+ }
+
+ try {
+ int totalKeys = 200;
+
+ T2<Integer, Integer> swapKeys = swapKeysCount(0);
+
+ T2<Integer, Integer> offheapKeys = offheapKeysCount(0);
+
+ int totalSwap = swapKeys.get1() + swapKeys.get2();
+ int totalOffheap = offheapKeys.get1() + offheapKeys.get2();
+
+ log.info("Keys [total=" + totalKeys + ", offheap=" + offheapKeys + ", swap=" + swapKeys + ']');
+
+ assertTrue(totalSwap + totalOffheap < totalKeys);
+
+ assertEquals(totalKeys, cache0.localSize());
+ assertEquals(totalKeys, cache0.localSizeLong(part, ALL));
+
+ assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP));
+ assertEquals(totalSwap, cache0.localSizeLong(part, SWAP));
+ assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP));
+
+ assertEquals(totalOffheap, cache0.sizeLong(part, OFFHEAP));
+ assertEquals(totalSwap, cache0.sizeLong(part, SWAP));
+ assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.sizeLong(part, ONHEAP));
+
+ assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP, PRIMARY));
+ assertEquals(totalSwap, cache0.localSizeLong(part, SWAP, PRIMARY));
+ assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP, PRIMARY));
+
+ assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP, BACKUP));
+ assertEquals(totalSwap, cache0.localSizeLong(part, SWAP, BACKUP));
+ assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP, BACKUP));
+ }
+ finally {
+ cache0.removeAll(keys);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNonLocalPartitionSize() throws Exception {
+ if (cacheMode() == LOCAL)
+ return;
+
+ awaitPartitionMapExchange(true, true);
+
+ checkEmpty();
+
+ for (int i = 0; i < gridCount(); i++) {
+ checkPartitionSizeAffinityFilter(i);
+ checkPartitionSizeStorageFilter(i);
+ }
+ }
+
+ /**
* @param nodeIdx Node index.
* @throws Exception If failed.
*/
@@ -627,6 +767,164 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
}
/**
+ * @param nodeIdx Node index.
+ * @throws Exception If failed.
+ */
+ private void checkPartitionSizeAffinityFilter(int nodeIdx) throws Exception {
+ IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
+
+ final int PUT_KEYS = 10;
+
+ int part = nodeIdx;
+
+ List<Integer> keys = null;
+
+ try {
+ if (cacheMode() == REPLICATED) {
+ keys = backupKeys(cache0, 10, 0);
+
+ for (Integer key : keys)
+ cache0.put(key, String.valueOf(key));
+
+ int partSize = 0;
+
+ for (Integer key : keys){
+ int keyPart = ignite(nodeIdx).affinity(null).partition(key);
+ if (keyPart == part)
+ partSize++;
+ }
+
+ assertEquals(PUT_KEYS, cache0.localSize(BACKUP));
+ assertEquals(PUT_KEYS, cache0.localSize(ALL));
+ assertEquals(partSize, cache0.localSizeLong(part, BACKUP));
+ assertEquals(partSize, cache0.localSizeLong(part, ALL));
+ assertEquals(0, cache0.localSizeLong(part, PRIMARY));
+ assertEquals(0, cache0.localSizeLong(part, NEAR));
+
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Integer, String> cache = jcache(i);
+ assertEquals(0, cache.size(NEAR));
+ assertEquals(partSize, cache.sizeLong(part, PRIMARY));
+ assertEquals(partSize * (gridCount() - 1), cache.sizeLong(part, BACKUP));
+ assertEquals(partSize * gridCount(), cache.sizeLong(part, PRIMARY, BACKUP));
+ assertEquals(partSize * gridCount(), cache.sizeLong(part, ALL)); // Primary + backups.
+ }
+ }
+ else {
+ keys = nearKeys(cache0, PUT_KEYS, 0);
+
+ for (Integer key : keys)
+ cache0.put(key, String.valueOf(key));
+
+ int partSize = 0;
+
+ for (Integer key :keys){
+ int keyPart = ignite(nodeIdx).affinity(null).partition(key);
+ if(keyPart == part)
+ partSize++;
+ }
+
+ if (hasNearCache()) {
+ assertEquals(0, cache0.localSize());
+ assertEquals(0, cache0.localSizeLong(part, ALL));
+ assertEquals(0, cache0.localSizeLong(part, NEAR));
+
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Integer, String> cache = jcache(i);
+
+ assertEquals(0, cache.sizeLong(part, NEAR));
+ assertEquals(partSize, cache.sizeLong(part, BACKUP));
+ assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP));
+ assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups + near.
+ }
+ }
+ else {
+ assertEquals(0, cache0.localSize());
+ //assertEquals(partitionSize, cache0.localSizeLong(partition, ALL));
+ assertEquals(0, cache0.localSizeLong(part, NEAR));
+
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Integer, String> cache = jcache(i);
+
+ assertEquals(0, cache.size(NEAR));
+ assertEquals(partSize, cache.sizeLong(part, BACKUP));
+ assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP));
+ assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups.
+ }
+ }
+
+ assertEquals(0, cache0.localSize(BACKUP));
+ assertEquals(0, cache0.localSize(PRIMARY));
+ }
+
+ checkPrimarySize(PUT_KEYS);
+
+ Affinity<Integer> aff = ignite(0).affinity(null);
+
+ for (int i = 0; i < gridCount(); i++) {
+ if (i == nodeIdx)
+ continue;
+
+ ClusterNode node = ignite(i).cluster().localNode();
+
+ int primary = 0;
+ int backups = 0;
+
+ for (Integer key : keys) {
+ if (aff.isPrimary(node, key) && aff.partition(key) == part)
+ primary++;
+ else if (aff.isBackup(node, key) && aff.partition(key) == part)
+ backups++;
+ }
+
+ IgniteCache<Integer, String> cache = jcache(i);
+
+ assertEquals(primary, cache.localSizeLong(part, PRIMARY));
+ assertEquals(backups, cache.localSizeLong(part, BACKUP));
+ assertEquals(primary + backups, cache.localSizeLong(part, PRIMARY, BACKUP));
+ assertEquals(primary + backups, cache.localSizeLong(part, BACKUP, PRIMARY));
+ assertEquals(primary + backups, cache.localSizeLong(part, ALL));
+ }
+
+ cache0.remove(keys.get(0));
+
+ keys.remove(0);
+
+ checkPrimarySize(PUT_KEYS - 1);
+
+ int primary = 0;
+ int backups = 0;
+
+ ClusterNode node = ignite(nodeIdx).cluster().localNode();
+
+ for (Integer key : keys) {
+ if (aff.isPrimary(node, key) && aff.partition(key) == part)
+ primary++;
+ else if (aff.isBackup(node, key) && aff.partition(key) == part)
+ backups++;
+ }
+
+ if (cacheMode() == REPLICATED) {
+ assertEquals(primary+backups, cache0.localSizeLong(part, ALL));
+ assertEquals(primary, cache0.localSizeLong(part, PRIMARY));
+ assertEquals(backups, cache0.localSizeLong(part, BACKUP));
+ }
+ else {
+ if (hasNearCache())
+ assertEquals(0, cache0.localSizeLong(part, ALL));
+ else
+ assertEquals(0, cache0.localSizeLong(part, ALL));
+ }
+ }
+ finally {
+ if (keys != null)
+ cache0.removeAll(new HashSet<>(keys));
+ }
+
+ checkEmpty();
+ }
+
+ /**
* Checks size is zero.
*/
private void checkEmpty() {
@@ -695,6 +993,31 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
/**
* @param nodeIdx Node index.
+ * @param part Cache partition
+ * @return Tuple with number of primary and backup keys (one or both will be zero).
+ */
+ private T2<Integer, Integer> swapKeysCount(int nodeIdx, int part) throws IgniteCheckedException {
+ GridCacheContext ctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache().context();
+ // Swap and offheap are disabled for near cache.
+ GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
+ //First count entries...
+ int cnt = (int)swapMgr.swapEntriesCount(part);
+
+ GridCacheAffinityManager affinity = ctx.affinity();
+ AffinityTopologyVersion topVer = affinity.affinityTopologyVersion();
+
+ //And then find out whether they are primary or backup ones.
+ int primaryCnt = 0;
+ int backupCnt = 0;
+ if (affinity.primary(ctx.localNode(), part, topVer))
+ primaryCnt = cnt;
+ else if (affinity.backup(ctx.localNode(), part, topVer))
+ backupCnt = cnt;
+ return new T2<>(primaryCnt, backupCnt);
+ }
+
+ /**
+ * @param nodeIdx Node index.
* @return Tuple with primary and backup keys.
*/
private T2<List<Integer>, List<Integer>> offheapKeys(int nodeIdx) {
@@ -742,6 +1065,31 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
/**
* @param nodeIdx Node index.
+ * @param part Cache partition.
+ * @return Tuple with number of primary and backup keys (one or both will be zero).
+ */
+ private T2<Integer, Integer> offheapKeysCount(int nodeIdx, int part) throws IgniteCheckedException {
+ GridCacheContext ctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache().context();
+ // Swap and offheap are disabled for near cache.
+ GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
+ //First count entries...
+ int cnt = (int)swapMgr.offheapEntriesCount(part);
+
+ GridCacheAffinityManager affinity = ctx.affinity();
+ AffinityTopologyVersion topVer = affinity.affinityTopologyVersion();
+
+ //And then find out whether they are primary or backup ones.
+ int primaryCnt = 0;
+ int backupCnt = 0;
+ if (affinity.primary(ctx.localNode(), part, topVer))
+ primaryCnt = cnt;
+ else if (affinity.backup(ctx.localNode(), part, topVer))
+ backupCnt = cnt;
+ return new T2<>(primaryCnt, backupCnt);
+ }
+
+ /**
+ * @param nodeIdx Node index.
* @throws Exception If failed.
*/
private void checkSizeStorageFilter(int nodeIdx) throws Exception {
@@ -862,6 +1210,119 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
}
/**
+ * @param nodeIdx Node index.
+ * @throws Exception If failed.
+ */
+ private void checkPartitionSizeStorageFilter(int nodeIdx) throws Exception {
+ IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
+
+ int part = nodeIdx;
+
+ List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000);
+ List<Integer> backupKeys = backupKeys(cache0, 100, 10_000);
+
+ try {
+ final String val = "test_value";
+
+ for (int i = 0; i < 100; i++) {
+ cache0.put(primaryKeys.get(i), val);
+ cache0.put(backupKeys.get(i), val);
+ }
+
+
+ int totalKeys = 200;
+
+ T2<Integer, Integer> swapKeys = swapKeysCount(nodeIdx, part);
+
+ T2<Integer, Integer> offheapKeys = offheapKeysCount(nodeIdx, part);
+
+ int totalSwap = swapKeys.get1() + swapKeys.get2();
+ int totalOffheap = offheapKeys.get1() + offheapKeys.get2();
+
+ log.info("Local keys [total=" + totalKeys + ", offheap=" + offheapKeys + ", swap=" + swapKeys + ']');
+
+ assertTrue(totalSwap + totalOffheap < totalKeys);
+
+ assertEquals(primaryKeys.size(), cache0.localSize());
+ assertEquals(totalKeys, cache0.localSize(ALL));
+ assertEquals(totalOffheap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP));
+ assertEquals(totalSwap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, SWAP));
+ assertEquals((long)swapKeys.get1(), cache0.localSizeLong(part, SWAP, PRIMARY));
+ assertEquals((long)swapKeys.get2(), cache0.localSizeLong(part, SWAP, BACKUP));
+
+ assertEquals((long)offheapKeys.get1(), cache0.localSizeLong(part, OFFHEAP, PRIMARY));
+ assertEquals((long)offheapKeys.get2(), cache0.localSizeLong(part, OFFHEAP, BACKUP));
+
+ assertEquals(swapKeys.get1() + offheapKeys.get1(), cache0.localSizeLong(part, SWAP, OFFHEAP, PRIMARY));
+ assertEquals(swapKeys.get2() + offheapKeys.get2(), cache0.localSizeLong(part, SWAP, OFFHEAP, BACKUP));
+
+ assertEquals(totalSwap + totalOffheap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, SWAP, OFFHEAP));
+
+ int globalParitionSwapPrimary = 0;
+ int globalPartSwapBackup = 0;
+
+ int globalPartOffheapPrimary = 0;
+ int globalPartOffheapBackup = 0;
+
+ for (int i = 0; i < gridCount(); i++) {
+ T2<Integer, Integer> swap = swapKeysCount(i, part);
+
+ globalParitionSwapPrimary += swap.get1();
+ globalPartSwapBackup += swap.get2();
+
+ T2<Integer, Integer> offheap = offheapKeysCount(i, part);
+
+ globalPartOffheapPrimary += offheap.get1();
+ globalPartOffheapBackup += offheap.get2();
+ }
+
+ int backups;
+
+ if (cacheMode() == LOCAL)
+ backups = 0;
+ else if (cacheMode() == PARTITIONED)
+ backups = 1;
+ else // REPLICATED.
+ backups = gridCount() - 1;
+
+ int globalTotal = totalKeys + totalKeys * backups;
+ int globalPartTotalSwap = globalParitionSwapPrimary + globalPartSwapBackup;
+ int globalPartTotalOffheap = globalPartOffheapPrimary + globalPartOffheapBackup;
+
+ log.info("Global keys [total=" + globalTotal +
+ ", offheap=" + globalPartTotalOffheap +
+ ", swap=" + globalPartTotalSwap + ']');
+
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Integer, String> cache = jcache(i);
+
+ assertEquals(totalKeys, cache.size(PRIMARY));
+ assertEquals(globalTotal, cache.size(ALL));
+ assertEquals(globalTotal, cache.size(PRIMARY, BACKUP, NEAR, ONHEAP, OFFHEAP, SWAP));
+ assertEquals(globalTotal, cache.size(ONHEAP, OFFHEAP, SWAP, PRIMARY, BACKUP));
+
+ assertEquals(globalPartTotalSwap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, SWAP));
+ assertEquals(globalParitionSwapPrimary, cache.sizeLong(part, SWAP, PRIMARY));
+ assertEquals(globalPartSwapBackup, cache.sizeLong(part, SWAP, BACKUP));
+
+ assertEquals(globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP));
+ assertEquals(globalPartOffheapPrimary, cache.sizeLong(part, OFFHEAP, PRIMARY));
+ assertEquals(globalPartOffheapBackup, cache.sizeLong(part, OFFHEAP, BACKUP));
+
+ assertEquals(globalPartTotalSwap + globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, SWAP, OFFHEAP));
+ assertEquals(globalParitionSwapPrimary + globalPartOffheapPrimary, cache.sizeLong(part, SWAP, OFFHEAP, PRIMARY));
+ assertEquals(globalPartSwapBackup + globalPartOffheapBackup, cache.sizeLong(part, SWAP, OFFHEAP, BACKUP));
+ }
+ }
+ finally {
+ cache0.removeAll(new HashSet<>(primaryKeys));
+ cache0.removeAll(new HashSet<>(backupKeys));
+ }
+
+ checkEmpty();
+ }
+
+ /**
* @param exp Expected size.
*/
private void checkPrimarySize(int exp) {
@@ -1167,4 +1628,4 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
assertTrue("Expected entries not found: " + allExp, allExp.isEmpty());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index f2f69dd..36a56f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -207,6 +207,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
+ @Override public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException {
+ return compute.call(new PartitionSizeLongTask(cacheName, isAsync, peekModes, partition, false));
+ }
+
+ /** {@inheritDoc} */
@Override public int localSize(CachePeekMode... peekModes) {
return compute.call(new SizeTask(cacheName, isAsync, peekModes, true));
}
@@ -217,6 +222,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
+ @Override public long localSizeLong(int partition, CachePeekMode... peekModes) {
+ return compute.call(new PartitionSizeLongTask(cacheName, isAsync, peekModes, partition, true));
+ }
+
+ /** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args)
@@ -703,6 +713,40 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/**
*
*/
+ private static class PartitionSizeLongTask extends CacheTaskAdapter<Void, Void, Long> {
+
+ /** Partition. */
+ int partition;
+
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
+
+ /** Local. */
+ private final boolean loc;
+
+ /**
+ * @param cacheName Cache name.
+ * @param async Async.
+ * @param peekModes Peek modes.
+ * @param partition partition.
+ * @param loc Local.
+ */
+ public PartitionSizeLongTask(String cacheName, boolean async, CachePeekMode[] peekModes, int partition, boolean loc) {
+ super(cacheName, async, null);
+ this.loc = loc;
+ this.peekModes = peekModes;
+ this.partition = partition;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long call() throws Exception {
+ return loc ? cache().localSizeLong(partition, peekModes) : cache().sizeLong(partition, peekModes);
+ }
+ }
+
+ /**
+ *
+ */
private static class GetTask<K, V> extends CacheTaskAdapter<K, V, V> {
/** Key. */
private final K key;
@@ -1499,4 +1543,4 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
return async ? cache.withAsync() : cache;
}
}
-}
\ No newline at end of file
+}