You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/06 14:39:38 UTC

[16/50] [abbrv] ignite git commit: IGNITE-3227 - Added method to get partition size

IGNITE-3227 - Added method to get partition size


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8af30781
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8af30781
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8af30781

Branch: refs/heads/ignite-1.5.31-1
Commit: 8af307819dba9f9fea4946cb09df01c4ef146f8a
Parents: 5b49dad
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Jul 7 15:58:50 2016 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Jul 7 15:58:50 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |  25 +
 .../processors/cache/GridCacheAdapter.java      | 215 ++++++++-
 .../processors/cache/GridCacheProxyImpl.java    |  36 ++
 .../processors/cache/IgniteCacheProxy.java      |  40 ++
 .../processors/cache/IgniteInternalCache.java   |  23 +
 .../cache/IgniteCacheAtomicPeekModesTest.java   |   2 +-
 .../cache/IgniteCachePeekModesAbstractTest.java | 463 ++++++++++++++++++-
 .../multijvm/IgniteCacheProcessProxy.java       |  46 +-
 8 files changed, 846 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 3af2c44..8cefb4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -370,6 +370,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
     public long sizeLong(CachePeekMode... peekModes) throws CacheException;
 
     /**
+     * Gets the number of all entries cached in a partition as a long value. By default, if {@code peekModes} value
+     * isn't defined, only size of primary copies across all nodes will be returned. This behavior is identical to
+     * calling this method with {@link CachePeekMode#PRIMARY} peek mode.
+     * <p>
+     * NOTE: this operation is distributed and will query all participating nodes for their partition cache sizes.
+     *
+     * @param partition partition.
+     * @param peekModes Optional peek modes. If not provided, then total partition cache size is returned.
+     * @return Partion cache size across all nodes.
+     */
+    @IgniteAsyncSupported
+    public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException;
+
+    /**
      * Gets the number of all entries cached on this node. By default, if {@code peekModes} value isn't defined,
      * only size of primary copies will be returned. This behavior is identical to calling this method with
      * {@link CachePeekMode#PRIMARY} peek mode.
@@ -390,6 +404,17 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
     public long localSizeLong(CachePeekMode... peekModes);
 
     /**
+     * Gets the number of all entries cached on this node for the partition as a long value. By default, if {@code peekModes} value isn't
+     * defined, only size of primary copies will be returned. This behavior is identical to calling this method with
+     * {@link CachePeekMode#PRIMARY} peek mode.
+     *
+     * @param partition partition.
+     * @param peekModes Optional peek modes. If not provided, then total cache size is returned.
+     * @return Cache size on this node.
+     */
+    public long localSizeLong(int partition, CachePeekMode... peekModes);
+
+    /**
      * @param map Map containing keys and entry processors to be applied to values.
      * @param args Additional arguments to pass to the {@link EntryProcessor}.
      * @return The map of {@link EntryProcessorResult}s of the processing per key,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 028f516..55bd81d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -3965,6 +3966,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
+        if (isLocal())
+            return localSizeLong(partition, peekModes);
+
+        return sizeLongAsync(partition, peekModes).get();
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) {
         assert peekModes != null;
 
@@ -4007,6 +4016,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Long> sizeLongAsync(final int part, final CachePeekMode[] peekModes) {
+        assert peekModes != null;
+
+        final PeekModes modes = parsePeekModes(peekModes, true);
+
+        IgniteClusterEx cluster = ctx.grid().cluster();
+        final GridCacheAffinityManager aff = ctx.affinity();
+        final AffinityTopologyVersion topVer = aff.affinityTopologyVersion();
+
+        ClusterGroup grp = cluster.forDataNodes(name());
+
+        Collection<ClusterNode> nodes = grp.forPredicate(new IgnitePredicate<ClusterNode>() {
+            /** {@inheritDoc} */
+            @Override public boolean apply(ClusterNode clusterNode) {
+                return clusterNode.version().compareTo(PartitionSizeLongTask.SINCE_VER) >= 0 &&
+                    ((modes.primary && aff.primary(clusterNode, part, topVer)) ||
+                        (modes.backup && aff.backup(clusterNode, part, topVer)));
+            }
+        }).nodes();
+
+        if (nodes.isEmpty())
+            return new GridFinishedFuture<>(0L);
+
+        ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
+
+        return ctx.kernalContext().task().execute(
+                new PartitionSizeLongTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, part), null);
+    }
+
+    /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException {
         return (int)localSizeLong(peekModes);
     }
@@ -4060,6 +4099,50 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException {
+        PeekModes modes = parsePeekModes(peekModes, true);
+
+        long size = 0;
+
+        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+
+        // Swap and offheap are disabled for near cache.
+        GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
+
+        if (ctx.isLocal()){
+            modes.primary = true;
+            modes.backup = true;
+
+            if (modes.heap)
+                size += size();
+
+            if (modes.swap)
+                size += swapMgr.swapEntriesCount(0);
+
+            if (modes.offheap)
+                size += swapMgr.offheapEntriesCount(0);
+        }
+        else {
+            GridDhtLocalPartition dhtPart = ctx.topology().localPartition(part, topVer, false);
+
+            if (dhtPart != null) {
+                if (modes.primary && dhtPart.primary(topVer) || modes.backup && dhtPart.backup(topVer)) {
+                    if (modes.heap)
+                        size += dhtPart.publicSize();
+
+                    if (modes.swap)
+                        size += swapMgr.swapEntriesCount(part);
+
+                    if (modes.offheap)
+                        size += swapMgr.offheapEntriesCount(part);
+                }
+            }
+        }
+
+        return size;
+    }
+
+    /** {@inheritDoc} */
     @Override public int size() {
         return map.publicSize();
     }
@@ -5637,6 +5720,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     * Internal callable for partition size calculation.
+     */
+    @GridInternal
+    private static class PartitionSizeLongJob extends TopologyVersionAwareJob {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Partition. */
+        private final int partition;
+
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param peekModes Cache peek modes.
+         * @param partition partition.
+         */
+        private PartitionSizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, int partition) {
+            super(cacheName, topVer);
+
+            this.peekModes = peekModes;
+            this.partition = partition;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) {
+            if (cache == null)
+                return 0;
+
+            try {
+                return cache.localSizeLong(partition, peekModes);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(PartitionSizeLongJob.class, this);
+        }
+    }
+
+    /**
      * Internal callable for global size calculation.
      */
     @GridInternal
@@ -6610,7 +6739,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
          * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        public SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
+        private SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
             this.cacheName = cacheName;
             this.topVer = topVer;
             this.peekModes = peekModes;
@@ -6655,6 +6784,90 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     * Partition Size Long task.
+     */
+    private static class PartitionSizeLongTask extends ComputeTaskAdapter<Object, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.30");
+
+        /** Partition */
+        private final int partition;
+
+        /** Cache name. */
+        private final String cacheName;
+
+        /** Affinity topology version. */
+        private final AffinityTopologyVersion topVer;
+
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param peekModes Cache peek modes.
+         * @param partition partition.
+         */
+        private PartitionSizeLongTask(
+            String cacheName,
+            AffinityTopologyVersion topVer,
+            CachePeekMode[] peekModes,
+            int partition
+        ) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+            this.peekModes = peekModes;
+            this.partition = partition;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            @Nullable Object arg
+        ) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+            for (ClusterNode node : subgrid)
+                jobs.put(new PartitionSizeLongJob(cacheName, topVer, peekModes, partition), node);
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            IgniteException e = res.getException();
+
+            if (e != null) {
+                if (e instanceof ClusterTopologyException)
+                    return ComputeJobResultPolicy.WAIT;
+
+                throw new IgniteException("Remote job threw exception.", e);
+            }
+
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Long reduce(List<ComputeJobResult> results) throws IgniteException {
+            long size = 0;
+
+            for (ComputeJobResult res : results) {
+                if (res != null) {
+                    if (res.getException() == null)
+                        size += res.<Long>getData();
+                    else
+                        throw res.getException();
+                }
+            }
+
+            return size;
+        }
+    }
+
+    /**
      * Clear task.
      */
     private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 99dd608..b46c4dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1483,6 +1483,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.sizeLong(partition, peekModes);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) {
         CacheOperationContext prev = gate.enter(opCtx);
 
@@ -1507,6 +1519,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Long> sizeLongAsync(int partition, CachePeekMode[] peekModes) {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.sizeLongAsync(partition, peekModes);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
@@ -1531,6 +1555,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.localSizeLong(partition, peekModes);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public int nearSize() {
         CacheOperationContext prev = gate.enter(opCtx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 12ec8b8..92e59db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -840,6 +840,29 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
 
 
     /** {@inheritDoc} */
+    @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException {
+        GridCacheGateway<K, V> gate = this.gate;
+
+        CacheOperationContext prev = onEnter(gate, opCtx);
+
+        try {
+            if (isAsync()) {
+                setFuture(delegate.sizeLongAsync(part, peekModes));
+
+                return 0;
+            }
+            else
+                return delegate.sizeLong(part, peekModes);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+        finally {
+            onLeave(gate, prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode... peekModes) {
         GridCacheGateway<K, V> gate = this.gate;
 
@@ -874,6 +897,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public long localSizeLong(int part, CachePeekMode... peekModes) {
+        GridCacheGateway<K, V> gate = this.gate;
+
+        CacheOperationContext prev = onEnter(gate, opCtx);
+
+        try {
+            return delegate.localSizeLong(part, peekModes);
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+        finally {
+            onLeave(gate, prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public V get(K key) {
         try {
             GridCacheGateway<K, V> gate = this.gate;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index d155b0e..4dc9a23f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1453,6 +1453,14 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public long localSizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException;
 
     /**
+     * @param partition partition.
+     * @param peekModes Peek modes.
+     * @return Local cache size as a long value.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException;
+
+    /**
      * @param peekModes Peek modes.
      * @return Global cache size.
      * @throws IgniteCheckedException If failed.
@@ -1467,6 +1475,14 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public long sizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException;
 
     /**
+     * @param partition partition
+     * @param peekModes Peek modes.
+     * @return Global cache size as a long value.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException;
+
+    /**
      * @param peekModes Peek modes.
      * @return Future.
      */
@@ -1479,6 +1495,13 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public IgniteInternalFuture<Long> sizeLongAsync(CachePeekMode[] peekModes);
 
     /**
+     * @param partition partiton
+     * @param peekModes Peek modes.
+     * @return Future.
+     */
+    public IgniteInternalFuture<Long> sizeLongAsync(int partition, CachePeekMode[] peekModes);
+
+    /**
      * Gets size of near cache key set. This method will return count of all entries in near
      * cache and has O(1) complexity on base cache projection.
      * <p>

http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
index 4270bab..8b7859a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java
@@ -48,4 +48,4 @@ public class IgniteCacheAtomicPeekModesTest extends IgniteCachePeekModesAbstract
     @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
         return PRIMARY;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
index c27cccb..5dc059b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -38,6 +39,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
@@ -501,6 +503,144 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testLocalPartitionSize() throws Exception {
+        if (cacheMode() != LOCAL)
+            return;
+
+        awaitPartitionMapExchange();
+        checkEmpty();
+        int part = 0;
+        IgniteCache<Integer, String> cache0 = jcache(0);
+
+        IgniteCache<Integer, String> cacheAsync0 = cache0.withAsync();
+
+        for (int i = 0; i < HEAP_ENTRIES; i++) {
+            cache0.put(i, String.valueOf(i));
+
+            final long size = i + 1;
+
+            assertEquals(size, cache0.localSize());
+            assertEquals(size, cache0.localSizeLong(part, PRIMARY));
+            assertEquals(size, cache0.localSizeLong(part, BACKUP));
+            assertEquals(size, cache0.localSizeLong(part, NEAR));
+            assertEquals(size, cache0.localSizeLong(part, ALL));
+
+            assertEquals(size, cache0.size());
+            assertEquals(size, cache0.sizeLong(part, PRIMARY));
+            assertEquals(size, cache0.sizeLong(part, BACKUP));
+            assertEquals(size, cache0.sizeLong(part, NEAR));
+            assertEquals(size, cache0.sizeLong(part, ALL));
+
+            cacheAsync0.size();
+
+            assertEquals(size, (long) cacheAsync0.<Integer>future().get());
+
+            cacheAsync0.sizeLong(part, PRIMARY);
+
+            assertEquals(size, cacheAsync0.future().get());
+        }
+
+        for (int i = 0; i < HEAP_ENTRIES; i++) {
+            cache0.remove(i, String.valueOf(i));
+
+            final int size = HEAP_ENTRIES - i - 1;
+
+            assertEquals(size, cache0.localSize());
+            assertEquals(size, cache0.localSizeLong(part, PRIMARY));
+            assertEquals(size, cache0.localSizeLong(part, BACKUP));
+            assertEquals(size, cache0.localSizeLong(part, NEAR));
+            assertEquals(size, cache0.localSizeLong(part, ALL));
+
+            assertEquals(size, cache0.size());
+            assertEquals(size, cache0.sizeLong(part, PRIMARY));
+            assertEquals(size, cache0.sizeLong(part, BACKUP));
+            assertEquals(size, cache0.sizeLong(part, NEAR));
+            assertEquals(size, cache0.sizeLong(part, ALL));
+
+            cacheAsync0.size();
+
+            assertEquals(size, (long) cacheAsync0.<Integer>future().get());
+        }
+    }
+
+    /**
+     * @throws InterruptedException If failed.
+     */
+    public void testLocalPartitionSizeFlags() throws InterruptedException {
+        if (cacheMode() != LOCAL)
+            return;
+
+        awaitPartitionMapExchange();
+        checkEmpty();
+        int part = 0;
+        IgniteCache<Integer, String> cache0 = jcache(0);
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 0; i < 200; i++) {
+            cache0.put(i, "test_val");
+
+            keys.add(i);
+        }
+
+        try {
+            int totalKeys = 200;
+
+            T2<Integer, Integer> swapKeys = swapKeysCount(0);
+
+            T2<Integer, Integer> offheapKeys = offheapKeysCount(0);
+
+            int totalSwap = swapKeys.get1() + swapKeys.get2();
+            int totalOffheap = offheapKeys.get1() + offheapKeys.get2();
+
+            log.info("Keys [total=" + totalKeys + ", offheap=" + offheapKeys + ", swap=" + swapKeys + ']');
+
+            assertTrue(totalSwap + totalOffheap < totalKeys);
+
+            assertEquals(totalKeys, cache0.localSize());
+            assertEquals(totalKeys, cache0.localSizeLong(part, ALL));
+
+            assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP));
+            assertEquals(totalSwap, cache0.localSizeLong(part, SWAP));
+            assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP));
+
+            assertEquals(totalOffheap, cache0.sizeLong(part, OFFHEAP));
+            assertEquals(totalSwap, cache0.sizeLong(part, SWAP));
+            assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.sizeLong(part, ONHEAP));
+
+            assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP, PRIMARY));
+            assertEquals(totalSwap, cache0.localSizeLong(part, SWAP, PRIMARY));
+            assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP, PRIMARY));
+
+            assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP, BACKUP));
+            assertEquals(totalSwap, cache0.localSizeLong(part, SWAP, BACKUP));
+            assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP, BACKUP));
+        }
+        finally {
+            cache0.removeAll(keys);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonLocalPartitionSize() throws Exception {
+        if (cacheMode() == LOCAL)
+            return;
+
+        awaitPartitionMapExchange(true, true);
+
+        checkEmpty();
+
+        for (int i = 0; i < gridCount(); i++) {
+            checkPartitionSizeAffinityFilter(i);
+            checkPartitionSizeStorageFilter(i);
+        }
+    }
+
+    /**
      * @param nodeIdx Node index.
      * @throws Exception If failed.
      */
@@ -627,6 +767,164 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
     }
 
     /**
+     * @param nodeIdx Node index.
+     * @throws Exception If failed.
+     */
+    private void checkPartitionSizeAffinityFilter(int nodeIdx) throws Exception {
+        IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
+
+        final int PUT_KEYS = 10;
+
+        int part = nodeIdx;
+
+        List<Integer> keys = null;
+
+        try {
+            if (cacheMode() == REPLICATED) {
+                keys = backupKeys(cache0, 10, 0);
+
+                for (Integer key : keys)
+                    cache0.put(key, String.valueOf(key));
+
+                int partSize = 0;
+
+                for (Integer key : keys){
+                    int keyPart = ignite(nodeIdx).affinity(null).partition(key);
+                    if (keyPart == part)
+                        partSize++;
+                }
+
+                assertEquals(PUT_KEYS, cache0.localSize(BACKUP));
+                assertEquals(PUT_KEYS, cache0.localSize(ALL));
+                assertEquals(partSize, cache0.localSizeLong(part, BACKUP));
+                assertEquals(partSize, cache0.localSizeLong(part, ALL));
+                assertEquals(0, cache0.localSizeLong(part, PRIMARY));
+                assertEquals(0, cache0.localSizeLong(part, NEAR));
+
+                for (int i = 0; i < gridCount(); i++) {
+                    IgniteCache<Integer, String> cache = jcache(i);
+                    assertEquals(0, cache.size(NEAR));
+                    assertEquals(partSize, cache.sizeLong(part, PRIMARY));
+                    assertEquals(partSize * (gridCount() - 1), cache.sizeLong(part, BACKUP));
+                    assertEquals(partSize * gridCount(), cache.sizeLong(part, PRIMARY, BACKUP));
+                    assertEquals(partSize * gridCount(), cache.sizeLong(part, ALL)); // Primary + backups.
+                }
+            }
+            else {
+                keys = nearKeys(cache0, PUT_KEYS, 0);
+
+                for (Integer key : keys)
+                    cache0.put(key, String.valueOf(key));
+
+                int partSize = 0;
+
+                for (Integer key :keys){
+                    int keyPart = ignite(nodeIdx).affinity(null).partition(key);
+                    if(keyPart == part)
+                        partSize++;
+                }
+
+                if (hasNearCache()) {
+                    assertEquals(0, cache0.localSize());
+                    assertEquals(0, cache0.localSizeLong(part, ALL));
+                    assertEquals(0, cache0.localSizeLong(part, NEAR));
+
+                    for (int i = 0; i < gridCount(); i++) {
+                        IgniteCache<Integer, String> cache = jcache(i);
+
+                        assertEquals(0, cache.sizeLong(part, NEAR));
+                        assertEquals(partSize, cache.sizeLong(part, BACKUP));
+                        assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP));
+                        assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups + near.
+                    }
+                }
+                else {
+                    assertEquals(0, cache0.localSize());
+                    //assertEquals(partitionSize, cache0.localSizeLong(partition, ALL));
+                    assertEquals(0, cache0.localSizeLong(part, NEAR));
+
+                    for (int i = 0; i < gridCount(); i++) {
+                        IgniteCache<Integer, String> cache = jcache(i);
+
+                        assertEquals(0, cache.size(NEAR));
+                        assertEquals(partSize, cache.sizeLong(part, BACKUP));
+                        assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP));
+                        assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups.
+                    }
+                }
+
+                assertEquals(0, cache0.localSize(BACKUP));
+                assertEquals(0, cache0.localSize(PRIMARY));
+            }
+
+            checkPrimarySize(PUT_KEYS);
+
+            Affinity<Integer> aff = ignite(0).affinity(null);
+
+            for (int i = 0; i < gridCount(); i++) {
+                if (i == nodeIdx)
+                    continue;
+
+                ClusterNode node = ignite(i).cluster().localNode();
+
+                int primary = 0;
+                int backups = 0;
+
+                for (Integer key : keys) {
+                    if (aff.isPrimary(node, key) && aff.partition(key) == part)
+                        primary++;
+                    else if (aff.isBackup(node, key) && aff.partition(key) == part)
+                        backups++;
+                }
+
+                IgniteCache<Integer, String> cache = jcache(i);
+
+                assertEquals(primary, cache.localSizeLong(part, PRIMARY));
+                assertEquals(backups, cache.localSizeLong(part, BACKUP));
+                assertEquals(primary + backups, cache.localSizeLong(part, PRIMARY, BACKUP));
+                assertEquals(primary + backups, cache.localSizeLong(part, BACKUP, PRIMARY));
+                assertEquals(primary + backups, cache.localSizeLong(part, ALL));
+            }
+
+            cache0.remove(keys.get(0));
+
+            keys.remove(0);
+
+            checkPrimarySize(PUT_KEYS - 1);
+
+            int primary = 0;
+            int backups = 0;
+
+            ClusterNode node = ignite(nodeIdx).cluster().localNode();
+
+            for (Integer key : keys) {
+                if (aff.isPrimary(node, key) && aff.partition(key) == part)
+                    primary++;
+                else if (aff.isBackup(node, key) && aff.partition(key) == part)
+                    backups++;
+            }
+
+            if (cacheMode() == REPLICATED) {
+                assertEquals(primary+backups, cache0.localSizeLong(part, ALL));
+                assertEquals(primary, cache0.localSizeLong(part, PRIMARY));
+                assertEquals(backups, cache0.localSizeLong(part, BACKUP));
+            }
+            else {
+                if (hasNearCache())
+                    assertEquals(0, cache0.localSizeLong(part, ALL));
+                else
+                    assertEquals(0, cache0.localSizeLong(part, ALL));
+            }
+        }
+        finally {
+            if (keys != null)
+                cache0.removeAll(new HashSet<>(keys));
+        }
+
+        checkEmpty();
+    }
+
+    /**
      * Checks size is zero.
      */
     private void checkEmpty() {
@@ -695,6 +993,31 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
     /**
      * @param nodeIdx Node index.
+     * @param part Cache partition
+     * @return Tuple with number of primary and backup keys (one or both will be zero).
+     */
+    private T2<Integer, Integer> swapKeysCount(int nodeIdx, int part) throws IgniteCheckedException {
+        GridCacheContext ctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache().context();
+        // Swap and offheap are disabled for near cache.
+        GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
+        //First count entries...
+        int cnt = (int)swapMgr.swapEntriesCount(part);
+
+        GridCacheAffinityManager affinity = ctx.affinity();
+        AffinityTopologyVersion topVer = affinity.affinityTopologyVersion();
+
+        //And then find out whether they are primary or backup ones.
+        int primaryCnt = 0;
+        int backupCnt = 0;
+        if (affinity.primary(ctx.localNode(), part, topVer))
+            primaryCnt = cnt;
+        else if (affinity.backup(ctx.localNode(), part, topVer))
+            backupCnt = cnt;
+        return new T2<>(primaryCnt, backupCnt);
+    }
+
+    /**
+     * @param nodeIdx Node index.
      * @return Tuple with primary and backup keys.
      */
     private T2<List<Integer>, List<Integer>> offheapKeys(int nodeIdx) {
@@ -742,6 +1065,31 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
     /**
      * @param nodeIdx Node index.
+     * @param part Cache partition.
+     * @return Tuple with number of primary and backup keys (one or both will be zero).
+     */
+    private T2<Integer, Integer> offheapKeysCount(int nodeIdx, int part) throws IgniteCheckedException {
+        GridCacheContext ctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache().context();
+        // Swap and offheap are disabled for near cache.
+        GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
+        //First count entries...
+        int cnt = (int)swapMgr.offheapEntriesCount(part);
+
+        GridCacheAffinityManager affinity = ctx.affinity();
+        AffinityTopologyVersion topVer = affinity.affinityTopologyVersion();
+
+        //And then find out whether they are primary or backup ones.
+        int primaryCnt = 0;
+        int backupCnt = 0;
+        if (affinity.primary(ctx.localNode(), part, topVer))
+            primaryCnt = cnt;
+        else if (affinity.backup(ctx.localNode(), part, topVer))
+            backupCnt = cnt;
+        return new T2<>(primaryCnt, backupCnt);
+    }
+
+    /**
+     * @param nodeIdx Node index.
      * @throws Exception If failed.
      */
     private void checkSizeStorageFilter(int nodeIdx) throws Exception {
@@ -862,6 +1210,119 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
     }
 
     /**
+     * @param nodeIdx Node index.
+     * @throws Exception If failed.
+     */
+    private void checkPartitionSizeStorageFilter(int nodeIdx) throws Exception {
+        IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
+
+        int part = nodeIdx;
+
+        List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000);
+        List<Integer> backupKeys = backupKeys(cache0, 100, 10_000);
+
+        try {
+            final String val = "test_value";
+
+            for (int i = 0; i < 100; i++) {
+                cache0.put(primaryKeys.get(i), val);
+                cache0.put(backupKeys.get(i), val);
+            }
+
+
+            int totalKeys = 200;
+
+            T2<Integer, Integer> swapKeys = swapKeysCount(nodeIdx, part);
+
+            T2<Integer, Integer> offheapKeys = offheapKeysCount(nodeIdx, part);
+
+            int totalSwap = swapKeys.get1() + swapKeys.get2();
+            int totalOffheap = offheapKeys.get1() + offheapKeys.get2();
+
+            log.info("Local keys [total=" + totalKeys + ", offheap=" + offheapKeys + ", swap=" + swapKeys + ']');
+
+            assertTrue(totalSwap + totalOffheap < totalKeys);
+
+            assertEquals(primaryKeys.size(), cache0.localSize());
+            assertEquals(totalKeys, cache0.localSize(ALL));
+            assertEquals(totalOffheap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP));
+            assertEquals(totalSwap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, SWAP));
+            assertEquals((long)swapKeys.get1(), cache0.localSizeLong(part, SWAP, PRIMARY));
+            assertEquals((long)swapKeys.get2(), cache0.localSizeLong(part, SWAP, BACKUP));
+
+            assertEquals((long)offheapKeys.get1(), cache0.localSizeLong(part, OFFHEAP, PRIMARY));
+            assertEquals((long)offheapKeys.get2(), cache0.localSizeLong(part, OFFHEAP, BACKUP));
+
+            assertEquals(swapKeys.get1() + offheapKeys.get1(), cache0.localSizeLong(part, SWAP, OFFHEAP, PRIMARY));
+            assertEquals(swapKeys.get2() + offheapKeys.get2(), cache0.localSizeLong(part, SWAP, OFFHEAP, BACKUP));
+
+            assertEquals(totalSwap + totalOffheap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, SWAP, OFFHEAP));
+
+            int globalParitionSwapPrimary = 0;
+            int globalPartSwapBackup = 0;
+
+            int globalPartOffheapPrimary = 0;
+            int globalPartOffheapBackup = 0;
+
+            for (int i = 0; i < gridCount(); i++) {
+                T2<Integer, Integer> swap = swapKeysCount(i, part);
+
+                globalParitionSwapPrimary += swap.get1();
+                globalPartSwapBackup += swap.get2();
+
+                T2<Integer, Integer> offheap = offheapKeysCount(i, part);
+
+                globalPartOffheapPrimary += offheap.get1();
+                globalPartOffheapBackup += offheap.get2();
+            }
+
+            int backups;
+
+            if (cacheMode() == LOCAL)
+                backups = 0;
+            else if (cacheMode() == PARTITIONED)
+                backups = 1;
+            else // REPLICATED.
+                backups = gridCount() - 1;
+
+            int globalTotal = totalKeys + totalKeys * backups;
+            int globalPartTotalSwap = globalParitionSwapPrimary + globalPartSwapBackup;
+            int globalPartTotalOffheap = globalPartOffheapPrimary + globalPartOffheapBackup;
+
+            log.info("Global keys [total=" + globalTotal +
+                    ", offheap=" + globalPartTotalOffheap +
+                    ", swap=" + globalPartTotalSwap + ']');
+
+            for (int i = 0; i < gridCount(); i++) {
+                IgniteCache<Integer, String> cache = jcache(i);
+
+                assertEquals(totalKeys, cache.size(PRIMARY));
+                assertEquals(globalTotal, cache.size(ALL));
+                assertEquals(globalTotal, cache.size(PRIMARY, BACKUP, NEAR, ONHEAP, OFFHEAP, SWAP));
+                assertEquals(globalTotal, cache.size(ONHEAP, OFFHEAP, SWAP, PRIMARY, BACKUP));
+
+                assertEquals(globalPartTotalSwap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, SWAP));
+                assertEquals(globalParitionSwapPrimary, cache.sizeLong(part, SWAP, PRIMARY));
+                assertEquals(globalPartSwapBackup, cache.sizeLong(part, SWAP, BACKUP));
+
+                assertEquals(globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP));
+                assertEquals(globalPartOffheapPrimary, cache.sizeLong(part, OFFHEAP, PRIMARY));
+                assertEquals(globalPartOffheapBackup, cache.sizeLong(part, OFFHEAP, BACKUP));
+
+                assertEquals(globalPartTotalSwap + globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, SWAP, OFFHEAP));
+                assertEquals(globalParitionSwapPrimary + globalPartOffheapPrimary, cache.sizeLong(part, SWAP, OFFHEAP, PRIMARY));
+                assertEquals(globalPartSwapBackup + globalPartOffheapBackup, cache.sizeLong(part, SWAP, OFFHEAP, BACKUP));
+            }
+        }
+        finally {
+            cache0.removeAll(new HashSet<>(primaryKeys));
+            cache0.removeAll(new HashSet<>(backupKeys));
+        }
+
+        checkEmpty();
+    }
+
+    /**
      * @param exp Expected size.
      */
     private void checkPrimarySize(int exp) {
@@ -1167,4 +1628,4 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
         assertTrue("Expected entries not found: " + allExp, allExp.isEmpty());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index f2f69dd..36a56f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -207,6 +207,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException {
+        return compute.call(new PartitionSizeLongTask(cacheName, isAsync, peekModes, partition, false));
+    }
+
+    /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode... peekModes) {
         return compute.call(new SizeTask(cacheName, isAsync, peekModes, true));
     }
@@ -217,6 +222,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public long localSizeLong(int partition, CachePeekMode... peekModes) {
+        return compute.call(new PartitionSizeLongTask(cacheName, isAsync, peekModes, partition, true));
+    }
+
+    /** {@inheritDoc} */
     @Override  public <T> Map<K, EntryProcessorResult<T>> invokeAll(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args)
@@ -703,6 +713,40 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     /**
      *
      */
+    private static class PartitionSizeLongTask extends CacheTaskAdapter<Void, Void, Long> {
+
+        /** Partition. */
+        int partition;
+
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
+
+        /** Local. */
+        private final boolean loc;
+
+        /**
+         * @param cacheName Cache name.
+         * @param async Async.
+         * @param peekModes Peek modes.
+         * @param partition partition.
+         * @param loc Local.
+         */
+        public PartitionSizeLongTask(String cacheName, boolean async, CachePeekMode[] peekModes, int partition, boolean loc) {
+            super(cacheName, async, null);
+            this.loc = loc;
+            this.peekModes = peekModes;
+            this.partition = partition;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Long call() throws Exception {
+            return loc ? cache().localSizeLong(partition, peekModes) : cache().sizeLong(partition, peekModes);
+        }
+    }
+
+    /**
+     *
+     */
     private static class GetTask<K, V> extends CacheTaskAdapter<K, V, V> {
         /** Key. */
         private final K key;
@@ -1499,4 +1543,4 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
             return async ? cache.withAsync() : cache;
         }
     }
-}
\ No newline at end of file
+}