You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/06/14 02:54:06 UTC

[1/4] ignite git commit: Add public API check for partitions count.

Repository: ignite
Updated Branches:
  refs/heads/master 8e4fc4c32 -> 442bd6190


Add public API check for partitions count.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80d19f04
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80d19f04
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80d19f04

Branch: refs/heads/master
Commit: 80d19f04a793fcf2009985b449ec3f058649ab4e
Parents: 4f8ba17
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jun 13 18:17:14 2016 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jun 13 18:17:14 2016 -0700

----------------------------------------------------------------------
 .../ignite/cache/affinity/fair/FairAffinityFunction.java      | 2 ++
 .../cache/affinity/rendezvous/RendezvousAffinityFunction.java | 2 ++
 .../org/apache/ignite/configuration/CacheConfiguration.java   | 3 +++
 .../ignite/internal/processors/cache/GridCacheProcessor.java  | 4 ++++
 .../cache/distributed/dht/preloader/GridDhtPartitionMap2.java | 7 +++++--
 5 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
index b6b14ec..105efab 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
@@ -198,6 +198,8 @@ public class FairAffinityFunction implements AffinityFunction {
      * @param parts Total number of partitions.
      */
     public void setPartitions(int parts) {
+        A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+
         this.parts = parts;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index 990eba1..aa8680c 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -222,6 +222,8 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
      * @param parts Total number of partitions.
      */
     public void setPartitions(int parts) {
+        A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+
         this.parts = parts;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 5a234de..3408834 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -100,6 +100,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Maximum number of partitions. */
+    public static final int MAX_PARTITIONS_COUNT = 0x4000;
+
     /** Default size of rebalance thread pool. */
     @Deprecated
     public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;

http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6625093..37c3cf1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -379,6 +379,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
                 U.maskName(cc.getName()) + ']');
 
+        if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
+            throw new IgniteCheckedException("Cannot have more than " + CacheConfiguration.MAX_PARTITIONS_COUNT +
+                " partitions [cacheName=" + cc.getName() + ", partitions=" + cc.getAffinity().partitions() + ']');
+
         if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
             assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 54dfb68..15b5a2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -196,7 +198,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
         assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
 
         this.updateSeq = updateSeq;
-        this.top = topVer;
+
+        top = topVer;
 
         return old;
     }
@@ -231,7 +234,7 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
             int ordinal = entry.getValue().ordinal();
 
             assert ordinal == (ordinal & 0x3);
-            assert entry.getKey() == (entry.getKey() & 0x3FFF);
+            assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey();
 
             int coded = (ordinal << 14) | entry.getKey();
 


[4/4] ignite git commit: Merge branch 'gridgain-7.6.1' of https://github.com/gridgain/apache-ignite

Posted by ak...@apache.org.
Merge branch 'gridgain-7.6.1' of https://github.com/gridgain/apache-ignite


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/442bd619
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/442bd619
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/442bd619

Branch: refs/heads/master
Commit: 442bd6190d38c9d3ab5c476775611fc78f5b2f43
Parents: 8e4fc4c fad73bf
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:45:24 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:45:24 2016 +0700

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |  25 ++-
 .../connection/GridClientNioTcpConnection.java  |   3 +
 .../GridClientOptimizedMarshaller.java          |   4 +-
 .../GridClientZipOptimizedMarshaller.java       | 167 +++++++++++++++++++
 .../impl/GridTcpRouterNioListenerAdapter.java   |  11 +-
 .../message/GridClientHandshakeRequest.java     |   4 +-
 .../protocols/tcp/GridTcpRestNioListener.java   |  19 ++-
 .../rest/protocols/tcp/GridTcpRestProtocol.java |  12 +-
 .../ignite/internal/util/nio/GridNioServer.java |  10 +-
 .../ignite/internal/visor/cache/VisorCache.java |  56 +------
 .../visor/cache/VisorCachePartition.java        |  89 ++++++++++
 .../visor/cache/VisorCachePartitions.java       |  88 ++++++++++
 .../visor/cache/VisorCachePartitionsTask.java   | 152 +++++++++++++++++
 .../internal/visor/cache/VisorCacheV3.java      |  68 +-------
 14 files changed, 569 insertions(+), 139 deletions(-)
----------------------------------------------------------------------



[2/4] ignite git commit: Implemented GridClientOptimizedMarshaller that zip content.

Posted by ak...@apache.org.
Implemented GridClientOptimizedMarshaller that zip content.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c599cf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c599cf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c599cf

Branch: refs/heads/master
Commit: 96c599cf177321f4dd2a402f696fe4f0cafd9224
Parents: 80d19f0
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:34:34 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:34:34 2016 +0700

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |  25 ++-
 .../connection/GridClientNioTcpConnection.java  |   3 +
 .../GridClientOptimizedMarshaller.java          |   4 +-
 .../GridClientZipOptimizedMarshaller.java       | 167 +++++++++++++++++++
 .../impl/GridTcpRouterNioListenerAdapter.java   |  11 +-
 .../message/GridClientHandshakeRequest.java     |   4 +-
 .../protocols/tcp/GridTcpRestNioListener.java   |  19 ++-
 .../rest/protocols/tcp/GridTcpRestProtocol.java |  12 +-
 .../ignite/internal/util/nio/GridNioServer.java |  10 +-
 9 files changed, 230 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 1bea3cc..6ea7c22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -47,6 +47,8 @@ import org.apache.ignite.internal.client.GridClientProtocol;
 import org.apache.ignite.internal.client.GridServerUnreachableException;
 import org.apache.ignite.internal.client.impl.GridClientFutureAdapter;
 import org.apache.ignite.internal.client.impl.GridClientThreadFactory;
+import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.client.util.GridClientStripedLock;
 import org.apache.ignite.internal.client.util.GridClientUtils;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
@@ -460,9 +462,26 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
             GridClientConnection conn;
 
             if (cfg.getProtocol() == GridClientProtocol.TCP) {
-                conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
-                    cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
-                    cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepBinariesThreadLocal());
+                GridClientMarshaller marsh = cfg.getMarshaller();
+
+                try {
+                    conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
+                        cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+                        cfg.isTcpNoDelay(), marsh, marshId, top, cred, keepBinariesThreadLocal());
+                }
+                catch (GridClientException e) {
+                    if (marsh instanceof GridClientZipOptimizedMarshaller) {
+                        log.warning("Failed to connect with GridClientZipOptimizedMarshaller," +
+                            " trying to fallback to default marshaller: " + e);
+
+                        conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
+                            cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+                            cfg.isTcpNoDelay(), ((GridClientZipOptimizedMarshaller)marsh).defaultMarshaller(), marshId,
+                            top, cred, keepBinariesThreadLocal());
+                    }
+                    else
+                        throw e;
+                }
             }
             else
                 throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index cfcb07f..8937504 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.client.impl.GridClientNodeMetricsAdapter;
 import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
 import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
 import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientAuthenticationRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
@@ -243,6 +244,8 @@ public class GridClientNioTcpConnection extends GridClientConnection {
             if (marshId != null)
                 req.marshallerId(marshId);
             // marsh != null.
+            else if (marsh instanceof GridClientZipOptimizedMarshaller)
+                req.marshallerId(GridClientZipOptimizedMarshaller.ID);
             else if (marsh instanceof GridClientOptimizedMarshaller)
                 req.marshallerId(GridClientOptimizedMarshaller.ID);
             else if (marsh instanceof GridClientJdkMarshaller)

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
index 4bc1dac..a112736 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
@@ -38,7 +38,7 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller {
     public static final byte ID = 1;
 
     /** Optimized marshaller. */
-    private final OptimizedMarshaller opMarsh;
+    protected final OptimizedMarshaller opMarsh;
 
     /**
      * Default constructor.
@@ -136,4 +136,4 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller {
             throw new UnsupportedOperationException();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
new file mode 100644
index 0000000..d9ce60e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.marshaller.optimized;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.plugin.PluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper, that adapts {@link OptimizedMarshaller} to {@link GridClientMarshaller} interface.
+ */
+public class GridClientZipOptimizedMarshaller extends GridClientOptimizedMarshaller {
+    /** ID. */
+    public static final byte ID = 3;
+
+    /** Default buffer size. */
+    private static final int DFLT_BUFFER_SIZE = 4096;
+
+    /** Default client marshaller to fallback. */
+    private final GridClientMarshaller dfltMarsh;
+
+    /**
+     * Constructor.
+     *
+     * @param dfltMarsh Marshaller to fallback to.
+     * @param plugins Plugins.
+     */
+    public GridClientZipOptimizedMarshaller(GridClientMarshaller dfltMarsh, @Nullable List<PluginProvider> plugins) {
+        super(plugins);
+
+        assert dfltMarsh!= null;
+
+        this.dfltMarsh = dfltMarsh;
+    }
+
+    /**
+     * Default marshaller that will be used in case of backward compatibility.
+     *
+     * @return Marshaller to fallback.
+     */
+    public GridClientMarshaller defaultMarshaller() {
+        return dfltMarsh;
+    }
+
+    /**
+     * Zips bytes.
+     *
+     * @param input Input bytes.
+     * @return Zipped byte array.
+     * @throws IOException If failed.
+     */
+    public static byte[] zipBytes(byte[] input) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+        try (ZipOutputStream zos = new ZipOutputStream(baos)) {
+            ZipEntry entry = new ZipEntry("");
+
+            try {
+                entry.setSize(input.length);
+
+                zos.putNextEntry(entry);
+                zos.write(input);
+            }
+            finally {
+                zos.closeEntry();
+            }
+        }
+
+        return baos.toByteArray();
+    }
+
+    /**
+     * Unzip bytes.
+     *
+     * @param input Zipped bytes.
+     * @return Unzipped byte array.
+     * @throws IOException
+     */
+    private static byte[] unzipBytes(byte[] input) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(input);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+        try(ZipInputStream zis = new ZipInputStream(bais)) {
+            zis.getNextEntry();
+
+            byte[] buf = new byte[DFLT_BUFFER_SIZE];
+
+            int len = zis.read(buf);
+
+            while (len > 0) {
+                baos.write(buf, 0, len);
+
+                len = zis.read(buf);
+            }
+        }
+
+        return baos.toByteArray();
+    }
+    /** {@inheritDoc} */
+    @Override public ByteBuffer marshal(Object obj, int off) throws IOException {
+        try {
+            if (!(obj instanceof GridClientMessage))
+                throw new IOException("Message serialization of given type is not supported: " +
+                    obj.getClass().getName());
+
+            byte[] marshBytes = opMarsh.marshal(obj);
+
+            boolean zip = marshBytes.length > 512;
+
+            byte[] bytes = zip ? zipBytes(marshBytes) : marshBytes;
+
+            ByteBuffer buf = ByteBuffer.allocate(off + bytes.length + 1);
+
+            buf.position(off);
+            buf.put((byte)(zip ? 1 : 0));
+            buf.put(bytes);
+            buf.flip();
+
+            return buf;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T unmarshal(byte[] bytes) throws IOException {
+        try {
+            boolean unzip = bytes[0] > 0;
+
+            byte[] marshBytes = Arrays.copyOfRange(bytes, 1, bytes.length);
+
+            return opMarsh.unmarshal(unzip ? unzipBytes(marshBytes) : marshBytes, null);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
index 02b63ad..6bcea09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.router.impl;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteLogger;
@@ -29,6 +30,7 @@ import org.apache.ignite.internal.client.GridClientFutureListener;
 import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
 import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
 import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.processors.rest.client.message.GridRouterRespo
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.PluginProvider;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MARSHALLER;
@@ -78,7 +81,11 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi
 
         marshMap = new HashMap<>();
 
-        marshMap.put(GridClientOptimizedMarshaller.ID, new GridClientOptimizedMarshaller(U.allPluginProviders()));
+        List<PluginProvider> providers = U.allPluginProviders();
+        GridClientOptimizedMarshaller optdMarsh = new GridClientOptimizedMarshaller(providers);
+
+        marshMap.put(GridClientOptimizedMarshaller.ID, optdMarsh);
+        marshMap.put(GridClientZipOptimizedMarshaller.ID, new GridClientZipOptimizedMarshaller(optdMarsh, providers));
         marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller());
 
         init();
@@ -213,4 +220,4 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi
 
         return res;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
index 3790dd0..4e1ba91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
@@ -66,7 +66,7 @@ public class GridClientHandshakeRequest extends GridClientAbstractMessage {
      * @param marshId Marshaller ID.
      */
     public void marshallerId(byte marshId) {
-        assert marshId >= 0 && marshId <= 2;
+        assert marshId >= 0 && marshId <= 3;
 
         this.marshId = marshId;
     }
@@ -104,4 +104,4 @@ public class GridClientHandshakeRequest extends GridClientAbstractMessage {
     @Override public String toString() {
         return getClass().getSimpleName() + " [arr=" + Arrays.toString(arr) + ']';
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index bf177cf..2cfdb75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -250,14 +250,17 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
                             GridNioFuture<?> sf = ses.send(res);
 
                             // Check if send failed.
-                            if (sf.isDone())
-                                try {
-                                    sf.get();
-                                }
-                                catch (Exception e) {
-                                    U.error(log, "Failed to process client request [ses=" + ses + ", msg=" + msg + ']',
-                                        e);
+                            sf.listen(new CI1<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    try {
+                                        fut.get();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        U.error(log, "Failed to process client request [ses=" + ses +
+                                            ", msg=" + msg + ']', e);
+                                    }
                                 }
+                            });
                         }
                     });
                 else
@@ -360,4 +363,4 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
     @Override public void onSessionIdleTimeout(GridNioSession ses) {
         ses.close();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index a4a51ea..6338fcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
 import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
 import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
 import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.IgnitePortProtocol;
 import org.jetbrains.annotations.Nullable;
 
@@ -170,8 +172,12 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
 
         Map<Byte, GridClientMarshaller> marshMap = new HashMap<>();
 
-        marshMap.put(GridClientOptimizedMarshaller.ID,
-            new GridClientOptimizedMarshaller(new ArrayList<>(ctx.plugins().allProviders())));
+        ArrayList<PluginProvider> providers = new ArrayList<>(ctx.plugins().allProviders());
+
+        GridClientOptimizedMarshaller optMarsh = new GridClientOptimizedMarshaller(providers);
+
+        marshMap.put(GridClientOptimizedMarshaller.ID, optMarsh);
+        marshMap.put(GridClientZipOptimizedMarshaller.ID, new GridClientZipOptimizedMarshaller(optMarsh, providers));
         marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller());
 
         lsnr.marshallers(marshMap);
@@ -291,4 +297,4 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
     @Override protected String getPortPropertyName() {
         return IgniteNodeAttributes.ATTR_REST_TCP_PORT;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a32f04f..9fd5e69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1572,8 +1572,7 @@ public class GridNioServer<T> {
                     throw e;
                 }
                 catch (Exception e) {
-                    if (!closed)
-                        U.warn(log, "Failed to process selector key (will close): " + ses, e);
+                    U.warn(log, "Failed to process selector key (will close): " + ses, e);
 
                     close(ses, new GridNioException(e));
                 }
@@ -1640,9 +1639,10 @@ public class GridNioServer<T> {
                 try {
                     long writeTimeout0 = writeTimeout;
 
+                    boolean opWrite = key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
+
                     // If we are writing and timeout passed.
-                    if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 &&
-                        now - ses.lastSendTime() > writeTimeout0) {
+                    if (opWrite && now - ses.lastSendTime() > writeTimeout0) {
                         filterChain.onSessionWriteTimeout(ses);
 
                         // Update timestamp to avoid multiple notifications within one timeout interval.
@@ -1653,7 +1653,7 @@ public class GridNioServer<T> {
 
                     long idleTimeout0 = idleTimeout;
 
-                    if (now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) {
+                    if (!opWrite && now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) {
                         filterChain.onSessionIdleTimeout(ses);
 
                         // Update timestamp to avoid multiple notifications within one timeout interval.


[3/4] ignite git commit: Refactor collecting partitions in VisorCache into separate task VisorCachePartitionsTask.

Posted by ak...@apache.org.
Refactor collecting partitions in VisorCache into separate task VisorCachePartitionsTask.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fad73bf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fad73bf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fad73bf5

Branch: refs/heads/master
Commit: fad73bf570760e697e20ea19c00c18bc5d6cfc6e
Parents: 96c599c
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:37:29 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:37:29 2016 +0700

----------------------------------------------------------------------
 .../ignite/internal/visor/cache/VisorCache.java |  56 +------
 .../visor/cache/VisorCachePartition.java        |  89 +++++++++++
 .../visor/cache/VisorCachePartitions.java       |  88 +++++++++++
 .../visor/cache/VisorCachePartitionsTask.java   | 152 +++++++++++++++++++
 .../internal/visor/cache/VisorCacheV3.java      |  68 +--------
 5 files changed, 339 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
index 1be7af8..f06813f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
@@ -18,24 +18,18 @@
 package org.apache.ignite.internal.visor.cache;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
@@ -97,10 +91,10 @@ public class VisorCache implements Serializable {
     /** Number of partitions. */
     private int partitions;
 
-    /** Primary partitions IDs with sizes. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<IgnitePair<Integer>> primaryPartitions;
 
-    /** Backup partitions IDs with sizes. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<IgnitePair<Integer>> backupPartitions;
 
     /** Cache metrics. */
@@ -162,53 +156,11 @@ public class VisorCache implements Serializable {
 
                     partitionsMap = new GridDhtPartitionMap(map2.nodeId(), map2.updateSequence(), map2.map());
                 }
-
-                List<GridDhtLocalPartition> parts = top.localPartitions();
-
-                primaryPartitions = new ArrayList<>(parts.size());
-                backupPartitions = new ArrayList<>(parts.size());
-
-                for (GridDhtLocalPartition part : parts) {
-                    int p = part.id();
-
-                    int sz = part.size();
-
-                    // Pass -1 as topology version in order not to wait for topology version.
-                    if (part.primary(AffinityTopologyVersion.NONE))
-                        primaryPartitions.add(new IgnitePair<>(p, sz));
-                    else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
-                        backupPartitions.add(new IgnitePair<>(p, sz));
-                }
-            }
-            else {
-                // Old way of collecting partitions info.
-                ClusterNode node = ignite.cluster().localNode();
-
-                int[] pp = ca.affinity().primaryPartitions(node);
-
-                primaryPartitions= new ArrayList<>(pp.length);
-
-                for (int p : pp) {
-                    Set set = ca.entrySet(p);
-
-                    primaryPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
-                }
-
-                int[] bp = ca.affinity().backupPartitions(node);
-
-                backupPartitions = new ArrayList<>(bp.length);
-
-                for (int p : bp) {
-                    Set set = ca.entrySet(p);
-
-                    backupPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
-                }
             }
         }
 
         size = ca.size();
         nearSize = ca.nearSize();
-
         dynamicDeploymentId = ca.context().dynamicDeploymentId();
         dhtSize = size - nearSize;
         primarySize = ca.primarySize();
@@ -401,14 +353,14 @@ public class VisorCache implements Serializable {
     }
 
     /**
-     * @return Primary partitions IDs with sizes.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<IgnitePair<Integer>> primaryPartitions() {
         return primaryPartitions;
     }
 
     /**
-     * @return Backup partitions IDs with sizes.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<IgnitePair<Integer>> backupPartitions() {
         return backupPartitions;

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
new file mode 100644
index 0000000..5909c1a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about keys in cache partition.
+ */
+public class VisorCachePartition implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int part;
+
+    /** */
+    private int heap;
+
+    /** */
+    private long offheap;
+
+    /** */
+    private long swap;
+
+    /**
+     * Full constructor.
+     *
+     * @param part Partition id.
+     * @param heap Number of keys in heap.
+     * @param offheap Number of keys in offheap.
+     * @param swap Number of keys in swap.
+     */
+    public VisorCachePartition(int part, int heap, long offheap, long swap) {
+        this.part = part;
+        this.heap = heap;
+        this.offheap = offheap;
+        this.swap = swap;
+    }
+
+    /**
+     * @return Partition id.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
+     * @return Number of keys in heap.
+     */
+    public int heap() {
+        return heap;
+    }
+
+    /**
+     * @return Number of keys in offheap.
+     */
+    public long offheap() {
+        return offheap;
+    }
+
+    /**
+     * @return Number of keys in swap.
+     */
+    public long swap() {
+        return swap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorCachePartition.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
new file mode 100644
index 0000000..4634fa6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about cache partitions.
+ */
+public class VisorCachePartitions implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private List<VisorCachePartition> primary;
+
+    /** */
+    private List<VisorCachePartition> backup;
+
+    /**
+     * Default constructor.
+     */
+    public VisorCachePartitions() {
+        primary = new ArrayList<>();
+        backup = new ArrayList<>();
+    }
+
+    /**
+     * Add primary partition descriptor.
+     *
+     * @param part Partition id.
+     * @param heap Number of primary keys in heap.
+     * @param offheap Number of primary keys in offheap.
+     * @param swap Number of primary keys in swap.
+     */
+    public void addPrimary(int part, int heap, long offheap, long swap) {
+       primary.add(new VisorCachePartition(part, heap, offheap, swap));
+    }
+
+    /**
+     * Add backup partition descriptor.
+     *
+     * @param part Partition id.
+     * @param heap Number of backup keys in heap.
+     * @param offheap Number of backup keys in offheap.
+     * @param swap Number of backup keys in swap.
+     */
+    public void addBackup(int part, int heap, long offheap, long swap) {
+       backup.add(new VisorCachePartition(part, heap, offheap, swap));
+    }
+
+    /**
+     * @return Get list of primary partitions.
+     */
+    public List<VisorCachePartition> primary() {
+        return primary;
+    }
+
+    /**
+     * @return Get list of backup partitions.
+     */
+    public List<VisorCachePartition> backup() {
+        return backup;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorCachePartitions.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
new file mode 100644
index 0000000..80836c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.escapeName;
+
+/**
+ * Task that collect keys distribution in partitions.
+ */
+@GridInternal
+public class VisorCachePartitionsTask extends VisorMultiNodeTask<String, Map<UUID, VisorCachePartitions>, VisorCachePartitions> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorCachePartitionsJob job(String arg) {
+        return new VisorCachePartitionsJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Map<UUID, VisorCachePartitions> reduce0(List<ComputeJobResult> results) {
+        Map<UUID, VisorCachePartitions> parts = new HashMap<>();
+
+        for (ComputeJobResult res : results) {
+            if (res.getException() != null)
+                throw res.getException();
+
+            parts.put(res.getNode().id(), (VisorCachePartitions)res.getData());
+        }
+
+        return parts;
+    }
+
+    /**
+     * Job that collect cache metrics from node.
+     */
+    private static class VisorCachePartitionsJob extends VisorJob<String, VisorCachePartitions> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with given argument.
+         *
+         * @param cacheName Cache name.
+         * @param debug Debug flag.
+         */
+        private VisorCachePartitionsJob(String cacheName, boolean debug) {
+            super(cacheName, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected VisorCachePartitions run(final String cacheName) throws IgniteException {
+            if (debug)
+                log(ignite.log(), "Collecting partitions for cache: " + escapeName(cacheName));
+
+            VisorCachePartitions parts = new VisorCachePartitions();
+
+            GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
+
+            // Cache was not started.
+            if (ca == null || !ca.context().started())
+                return parts;
+
+            CacheConfiguration cfg = ca.configuration();
+
+            CacheMode mode = cfg.getCacheMode();
+
+            boolean partitioned = (mode == CacheMode.PARTITIONED || mode == CacheMode.REPLICATED)
+                && ca.context().affinityNode();
+
+            if (partitioned) {
+                GridCacheSwapManager swap = ca.context().swap();
+
+                GridDhtCacheAdapter dca = null;
+
+                if (ca instanceof GridNearCacheAdapter)
+                    dca = ((GridNearCacheAdapter)ca).dht();
+                else if (ca instanceof GridDhtCacheAdapter)
+                    dca = (GridDhtCacheAdapter)ca;
+
+                if (dca != null) {
+                    GridDhtPartitionTopology top = dca.topology();
+
+                    List<GridDhtLocalPartition> locParts = top.localPartitions();
+
+                    try {
+                        for (GridDhtLocalPartition part : locParts) {
+                            int p = part.id();
+
+                            int sz = part.size();
+
+                            // Pass -1 as topology version in order not to wait for topology version.
+                            if (part.primary(AffinityTopologyVersion.NONE))
+                                parts.addPrimary(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+                            else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
+                                parts.addBackup(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException("Failed to collect keys distribution in partitions", e);
+                    }
+                }
+            }
+
+            return parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(VisorCachePartitionsJob.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
index bd9a3ce..fab37e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
@@ -17,90 +17,34 @@
 
 package org.apache.ignite.internal.visor.cache;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
 import org.apache.ignite.internal.util.lang.GridTuple3;
-import org.apache.ignite.internal.util.lang.IgnitePair;
 
 /**
  * Data transfer object for {@link IgniteCache}.
+ *
+ * @deprecated Needed only for backward compatibility.
  */
 public class VisorCacheV3 extends VisorCacheV2 {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Primary partitions IDs with offheap and swap entries count. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<GridTuple3<Integer, Long, Long>> primaryPartsOffheapSwap;
 
-    /** Backup partitions IDs with offheap and swap entries count. */
+    /** @deprecated Needed only for backward compatibility. */
     private Collection<GridTuple3<Integer, Long, Long>> backupPartsOffheapSwap;
 
-    /** {@inheritDoc} */
-    @Override public VisorCache from(IgniteEx ignite, String cacheName, int sample) throws IgniteCheckedException {
-        VisorCache c = super.from(ignite, cacheName, sample);
-
-        if (c != null && c instanceof VisorCacheV3) {
-            VisorCacheV3 cacheV3 = (VisorCacheV3)c;
-
-            GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
-
-            // Process only started caches.
-            if (ca != null && ca.context().started()) {
-                GridCacheSwapManager swap = ca.context().swap();
-
-                cacheV3.primaryPartsOffheapSwap = new ArrayList<>(c.primaryPartitions().size());
-
-                for (IgnitePair<Integer> part: c.primaryPartitions()) {
-                    int p = part.get1();
-
-                    cacheV3.primaryPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
-                }
-
-                cacheV3.backupPartsOffheapSwap = new ArrayList<>(c.backupPartitions().size());
-
-                for (IgnitePair<Integer> part: c.backupPartitions()) {
-                    int p = part.get1();
-
-                    cacheV3.backupPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
-                }
-            }
-        }
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected VisorCache initHistory(VisorCache c) {
-        super.initHistory(c);
-
-        if (c instanceof VisorCacheV3) {
-            ((VisorCacheV3)c).primaryPartsOffheapSwap = Collections.emptyList();
-            ((VisorCacheV3)c).backupPartsOffheapSwap = Collections.emptyList();
-        }
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override public VisorCache history() {
-        return initHistory(new VisorCacheV3());
-    }
-
     /**
-     * @return Collection with primary partitions IDs and offheap and swap entries count.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<GridTuple3<Integer, Long, Long>> primaryPartitionsOffheapSwap() {
         return primaryPartsOffheapSwap;
     }
 
     /**
-     * @return Collection with backup partitions IDs and offheap and swap entries count.
+     * @deprecated Needed only for backward compatibility.
      */
     public Collection<GridTuple3<Integer, Long, Long>> backupPartitionsOffheapSwap() {
         return backupPartsOffheapSwap;