You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/06/14 02:54:06 UTC
[1/4] ignite git commit: Add public API check for partitions count.
Repository: ignite
Updated Branches:
refs/heads/master 8e4fc4c32 -> 442bd6190
Add public API check for partitions count.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80d19f04
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80d19f04
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80d19f04
Branch: refs/heads/master
Commit: 80d19f04a793fcf2009985b449ec3f058649ab4e
Parents: 4f8ba17
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Jun 13 18:17:14 2016 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jun 13 18:17:14 2016 -0700
----------------------------------------------------------------------
.../ignite/cache/affinity/fair/FairAffinityFunction.java | 2 ++
.../cache/affinity/rendezvous/RendezvousAffinityFunction.java | 2 ++
.../org/apache/ignite/configuration/CacheConfiguration.java | 3 +++
.../ignite/internal/processors/cache/GridCacheProcessor.java | 4 ++++
.../cache/distributed/dht/preloader/GridDhtPartitionMap2.java | 7 +++++--
5 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
index b6b14ec..105efab 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
@@ -198,6 +198,8 @@ public class FairAffinityFunction implements AffinityFunction {
* @param parts Total number of partitions.
*/
public void setPartitions(int parts) {
+ A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+
this.parts = parts;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index 990eba1..aa8680c 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -222,6 +222,8 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
* @param parts Total number of partitions.
*/
public void setPartitions(int parts) {
+ A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+
this.parts = parts;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 5a234de..3408834 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -100,6 +100,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** */
private static final long serialVersionUID = 0L;
+ /** Maximum number of partitions. */
+ public static final int MAX_PARTITIONS_COUNT = 0x4000;
+
/** Default size of rebalance thread pool. */
@Deprecated
public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6625093..37c3cf1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -379,6 +379,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
U.maskName(cc.getName()) + ']');
+ if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
+ throw new IgniteCheckedException("Cannot have more than " + CacheConfiguration.MAX_PARTITIONS_COUNT +
+ " partitions [cacheName=" + cc.getName() + ", partitions=" + cc.getAffinity().partitions() + ']');
+
if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
http://git-wip-us.apache.org/repos/asf/ignite/blob/80d19f04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 54dfb68..15b5a2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -196,7 +198,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
this.updateSeq = updateSeq;
- this.top = topVer;
+
+ top = topVer;
return old;
}
@@ -231,7 +234,7 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
int ordinal = entry.getValue().ordinal();
assert ordinal == (ordinal & 0x3);
- assert entry.getKey() == (entry.getKey() & 0x3FFF);
+ assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey();
int coded = (ordinal << 14) | entry.getKey();
[4/4] ignite git commit: Merge branch 'gridgain-7.6.1' of
https://github.com/gridgain/apache-ignite
Posted by ak...@apache.org.
Merge branch 'gridgain-7.6.1' of https://github.com/gridgain/apache-ignite
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/442bd619
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/442bd619
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/442bd619
Branch: refs/heads/master
Commit: 442bd6190d38c9d3ab5c476775611fc78f5b2f43
Parents: 8e4fc4c fad73bf
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:45:24 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:45:24 2016 +0700
----------------------------------------------------------------------
.../GridClientConnectionManagerAdapter.java | 25 ++-
.../connection/GridClientNioTcpConnection.java | 3 +
.../GridClientOptimizedMarshaller.java | 4 +-
.../GridClientZipOptimizedMarshaller.java | 167 +++++++++++++++++++
.../impl/GridTcpRouterNioListenerAdapter.java | 11 +-
.../message/GridClientHandshakeRequest.java | 4 +-
.../protocols/tcp/GridTcpRestNioListener.java | 19 ++-
.../rest/protocols/tcp/GridTcpRestProtocol.java | 12 +-
.../ignite/internal/util/nio/GridNioServer.java | 10 +-
.../ignite/internal/visor/cache/VisorCache.java | 56 +------
.../visor/cache/VisorCachePartition.java | 89 ++++++++++
.../visor/cache/VisorCachePartitions.java | 88 ++++++++++
.../visor/cache/VisorCachePartitionsTask.java | 152 +++++++++++++++++
.../internal/visor/cache/VisorCacheV3.java | 68 +-------
14 files changed, 569 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
[2/4] ignite git commit: Implemented GridClientOptimizedMarshaller
that zip content.
Posted by ak...@apache.org.
Implemented GridClientOptimizedMarshaller that zip content.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c599cf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c599cf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c599cf
Branch: refs/heads/master
Commit: 96c599cf177321f4dd2a402f696fe4f0cafd9224
Parents: 80d19f0
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:34:34 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:34:34 2016 +0700
----------------------------------------------------------------------
.../GridClientConnectionManagerAdapter.java | 25 ++-
.../connection/GridClientNioTcpConnection.java | 3 +
.../GridClientOptimizedMarshaller.java | 4 +-
.../GridClientZipOptimizedMarshaller.java | 167 +++++++++++++++++++
.../impl/GridTcpRouterNioListenerAdapter.java | 11 +-
.../message/GridClientHandshakeRequest.java | 4 +-
.../protocols/tcp/GridTcpRestNioListener.java | 19 ++-
.../rest/protocols/tcp/GridTcpRestProtocol.java | 12 +-
.../ignite/internal/util/nio/GridNioServer.java | 10 +-
9 files changed, 230 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 1bea3cc..6ea7c22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -47,6 +47,8 @@ import org.apache.ignite.internal.client.GridClientProtocol;
import org.apache.ignite.internal.client.GridServerUnreachableException;
import org.apache.ignite.internal.client.impl.GridClientFutureAdapter;
import org.apache.ignite.internal.client.impl.GridClientThreadFactory;
+import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
import org.apache.ignite.internal.client.util.GridClientStripedLock;
import org.apache.ignite.internal.client.util.GridClientUtils;
import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
@@ -460,9 +462,26 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
GridClientConnection conn;
if (cfg.getProtocol() == GridClientProtocol.TCP) {
- conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
- cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
- cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepBinariesThreadLocal());
+ GridClientMarshaller marsh = cfg.getMarshaller();
+
+ try {
+ conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
+ cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+ cfg.isTcpNoDelay(), marsh, marshId, top, cred, keepBinariesThreadLocal());
+ }
+ catch (GridClientException e) {
+ if (marsh instanceof GridClientZipOptimizedMarshaller) {
+ log.warning("Failed to connect with GridClientZipOptimizedMarshaller," +
+ " trying to fallback to default marshaller: " + e);
+
+ conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
+ cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+ cfg.isTcpNoDelay(), ((GridClientZipOptimizedMarshaller)marsh).defaultMarshaller(), marshId,
+ top, cred, keepBinariesThreadLocal());
+ }
+ else
+ throw e;
+ }
}
else
throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index cfcb07f..8937504 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.client.impl.GridClientNodeMetricsAdapter;
import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
import org.apache.ignite.internal.processors.rest.client.message.GridClientAuthenticationRequest;
import org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest;
import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
@@ -243,6 +244,8 @@ public class GridClientNioTcpConnection extends GridClientConnection {
if (marshId != null)
req.marshallerId(marshId);
// marsh != null.
+ else if (marsh instanceof GridClientZipOptimizedMarshaller)
+ req.marshallerId(GridClientZipOptimizedMarshaller.ID);
else if (marsh instanceof GridClientOptimizedMarshaller)
req.marshallerId(GridClientOptimizedMarshaller.ID);
else if (marsh instanceof GridClientJdkMarshaller)
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
index 4bc1dac..a112736 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
@@ -38,7 +38,7 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller {
public static final byte ID = 1;
/** Optimized marshaller. */
- private final OptimizedMarshaller opMarsh;
+ protected final OptimizedMarshaller opMarsh;
/**
* Default constructor.
@@ -136,4 +136,4 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller {
throw new UnsupportedOperationException();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
new file mode 100644
index 0000000..d9ce60e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.marshaller.optimized;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.plugin.PluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper, that adapts {@link OptimizedMarshaller} to {@link GridClientMarshaller} interface.
+ */
+public class GridClientZipOptimizedMarshaller extends GridClientOptimizedMarshaller {
+ /** ID. */
+ public static final byte ID = 3;
+
+ /** Default buffer size. */
+ private static final int DFLT_BUFFER_SIZE = 4096;
+
+ /** Default client marshaller to fallback. */
+ private final GridClientMarshaller dfltMarsh;
+
+ /**
+ * Constructor.
+ *
+ * @param dfltMarsh Marshaller to fallback to.
+ * @param plugins Plugins.
+ */
+ public GridClientZipOptimizedMarshaller(GridClientMarshaller dfltMarsh, @Nullable List<PluginProvider> plugins) {
+ super(plugins);
+
+ assert dfltMarsh!= null;
+
+ this.dfltMarsh = dfltMarsh;
+ }
+
+ /**
+ * Default marshaller that will be used in case of backward compatibility.
+ *
+ * @return Marshaller to fallback.
+ */
+ public GridClientMarshaller defaultMarshaller() {
+ return dfltMarsh;
+ }
+
+ /**
+ * Zips bytes.
+ *
+ * @param input Input bytes.
+ * @return Zipped byte array.
+ * @throws IOException If failed.
+ */
+ public static byte[] zipBytes(byte[] input) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+ try (ZipOutputStream zos = new ZipOutputStream(baos)) {
+ ZipEntry entry = new ZipEntry("");
+
+ try {
+ entry.setSize(input.length);
+
+ zos.putNextEntry(entry);
+ zos.write(input);
+ }
+ finally {
+ zos.closeEntry();
+ }
+ }
+
+ return baos.toByteArray();
+ }
+
+ /**
+ * Unzip bytes.
+ *
+ * @param input Zipped bytes.
+ * @return Unzipped byte array.
+ * @throws IOException
+ */
+ private static byte[] unzipBytes(byte[] input) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(input);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+ try(ZipInputStream zis = new ZipInputStream(bais)) {
+ zis.getNextEntry();
+
+ byte[] buf = new byte[DFLT_BUFFER_SIZE];
+
+ int len = zis.read(buf);
+
+ while (len > 0) {
+ baos.write(buf, 0, len);
+
+ len = zis.read(buf);
+ }
+ }
+
+ return baos.toByteArray();
+ }
+ /** {@inheritDoc} */
+ @Override public ByteBuffer marshal(Object obj, int off) throws IOException {
+ try {
+ if (!(obj instanceof GridClientMessage))
+ throw new IOException("Message serialization of given type is not supported: " +
+ obj.getClass().getName());
+
+ byte[] marshBytes = opMarsh.marshal(obj);
+
+ boolean zip = marshBytes.length > 512;
+
+ byte[] bytes = zip ? zipBytes(marshBytes) : marshBytes;
+
+ ByteBuffer buf = ByteBuffer.allocate(off + bytes.length + 1);
+
+ buf.position(off);
+ buf.put((byte)(zip ? 1 : 0));
+ buf.put(bytes);
+ buf.flip();
+
+ return buf;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unmarshal(byte[] bytes) throws IOException {
+ try {
+ boolean unzip = bytes[0] > 0;
+
+ byte[] marshBytes = Arrays.copyOfRange(bytes, 1, bytes.length);
+
+ return opMarsh.unmarshal(unzip ? unzipBytes(marshBytes) : marshBytes, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
index 02b63ad..6bcea09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.router.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteLogger;
@@ -29,6 +30,7 @@ import org.apache.ignite.internal.client.GridClientFutureListener;
import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.processors.rest.client.message.GridRouterRespo
import org.apache.ignite.internal.util.nio.GridNioServerListener;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.PluginProvider;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MARSHALLER;
@@ -78,7 +81,11 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi
marshMap = new HashMap<>();
- marshMap.put(GridClientOptimizedMarshaller.ID, new GridClientOptimizedMarshaller(U.allPluginProviders()));
+ List<PluginProvider> providers = U.allPluginProviders();
+ GridClientOptimizedMarshaller optdMarsh = new GridClientOptimizedMarshaller(providers);
+
+ marshMap.put(GridClientOptimizedMarshaller.ID, optdMarsh);
+ marshMap.put(GridClientZipOptimizedMarshaller.ID, new GridClientZipOptimizedMarshaller(optdMarsh, providers));
marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller());
init();
@@ -213,4 +220,4 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi
return res;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
index 3790dd0..4e1ba91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
@@ -66,7 +66,7 @@ public class GridClientHandshakeRequest extends GridClientAbstractMessage {
* @param marshId Marshaller ID.
*/
public void marshallerId(byte marshId) {
- assert marshId >= 0 && marshId <= 2;
+ assert marshId >= 0 && marshId <= 3;
this.marshId = marshId;
}
@@ -104,4 +104,4 @@ public class GridClientHandshakeRequest extends GridClientAbstractMessage {
@Override public String toString() {
return getClass().getSimpleName() + " [arr=" + Arrays.toString(arr) + ']';
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index bf177cf..2cfdb75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -250,14 +250,17 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
GridNioFuture<?> sf = ses.send(res);
// Check if send failed.
- if (sf.isDone())
- try {
- sf.get();
- }
- catch (Exception e) {
- U.error(log, "Failed to process client request [ses=" + ses + ", msg=" + msg + ']',
- e);
+ sf.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process client request [ses=" + ses +
+ ", msg=" + msg + ']', e);
+ }
}
+ });
}
});
else
@@ -360,4 +363,4 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
@Override public void onSessionIdleTimeout(GridNioSession ses) {
ses.close();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index a4a51ea..6338fcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler;
import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.jetbrains.annotations.Nullable;
@@ -170,8 +172,12 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
Map<Byte, GridClientMarshaller> marshMap = new HashMap<>();
- marshMap.put(GridClientOptimizedMarshaller.ID,
- new GridClientOptimizedMarshaller(new ArrayList<>(ctx.plugins().allProviders())));
+ ArrayList<PluginProvider> providers = new ArrayList<>(ctx.plugins().allProviders());
+
+ GridClientOptimizedMarshaller optMarsh = new GridClientOptimizedMarshaller(providers);
+
+ marshMap.put(GridClientOptimizedMarshaller.ID, optMarsh);
+ marshMap.put(GridClientZipOptimizedMarshaller.ID, new GridClientZipOptimizedMarshaller(optMarsh, providers));
marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller());
lsnr.marshallers(marshMap);
@@ -291,4 +297,4 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
@Override protected String getPortPropertyName() {
return IgniteNodeAttributes.ATTR_REST_TCP_PORT;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a32f04f..9fd5e69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1572,8 +1572,7 @@ public class GridNioServer<T> {
throw e;
}
catch (Exception e) {
- if (!closed)
- U.warn(log, "Failed to process selector key (will close): " + ses, e);
+ U.warn(log, "Failed to process selector key (will close): " + ses, e);
close(ses, new GridNioException(e));
}
@@ -1640,9 +1639,10 @@ public class GridNioServer<T> {
try {
long writeTimeout0 = writeTimeout;
+ boolean opWrite = key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
+
// If we are writing and timeout passed.
- if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0 &&
- now - ses.lastSendTime() > writeTimeout0) {
+ if (opWrite && now - ses.lastSendTime() > writeTimeout0) {
filterChain.onSessionWriteTimeout(ses);
// Update timestamp to avoid multiple notifications within one timeout interval.
@@ -1653,7 +1653,7 @@ public class GridNioServer<T> {
long idleTimeout0 = idleTimeout;
- if (now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) {
+ if (!opWrite && now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime() > idleTimeout0) {
filterChain.onSessionIdleTimeout(ses);
// Update timestamp to avoid multiple notifications within one timeout interval.
[3/4] ignite git commit: Refactor collecting partitions in VisorCache
into separate task VisorCachePartitionsTask.
Posted by ak...@apache.org.
Refactor collecting partitions in VisorCache into separate task VisorCachePartitionsTask.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fad73bf5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fad73bf5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fad73bf5
Branch: refs/heads/master
Commit: fad73bf570760e697e20ea19c00c18bc5d6cfc6e
Parents: 96c599c
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Tue Jun 14 09:37:29 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Jun 14 09:37:29 2016 +0700
----------------------------------------------------------------------
.../ignite/internal/visor/cache/VisorCache.java | 56 +------
.../visor/cache/VisorCachePartition.java | 89 +++++++++++
.../visor/cache/VisorCachePartitions.java | 88 +++++++++++
.../visor/cache/VisorCachePartitionsTask.java | 152 +++++++++++++++++++
.../internal/visor/cache/VisorCacheV3.java | 68 +--------
5 files changed, 339 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
index 1be7af8..f06813f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
@@ -18,24 +18,18 @@
package org.apache.ignite.internal.visor.cache;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
@@ -97,10 +91,10 @@ public class VisorCache implements Serializable {
/** Number of partitions. */
private int partitions;
- /** Primary partitions IDs with sizes. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<IgnitePair<Integer>> primaryPartitions;
- /** Backup partitions IDs with sizes. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<IgnitePair<Integer>> backupPartitions;
/** Cache metrics. */
@@ -162,53 +156,11 @@ public class VisorCache implements Serializable {
partitionsMap = new GridDhtPartitionMap(map2.nodeId(), map2.updateSequence(), map2.map());
}
-
- List<GridDhtLocalPartition> parts = top.localPartitions();
-
- primaryPartitions = new ArrayList<>(parts.size());
- backupPartitions = new ArrayList<>(parts.size());
-
- for (GridDhtLocalPartition part : parts) {
- int p = part.id();
-
- int sz = part.size();
-
- // Pass -1 as topology version in order not to wait for topology version.
- if (part.primary(AffinityTopologyVersion.NONE))
- primaryPartitions.add(new IgnitePair<>(p, sz));
- else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
- backupPartitions.add(new IgnitePair<>(p, sz));
- }
- }
- else {
- // Old way of collecting partitions info.
- ClusterNode node = ignite.cluster().localNode();
-
- int[] pp = ca.affinity().primaryPartitions(node);
-
- primaryPartitions= new ArrayList<>(pp.length);
-
- for (int p : pp) {
- Set set = ca.entrySet(p);
-
- primaryPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
- }
-
- int[] bp = ca.affinity().backupPartitions(node);
-
- backupPartitions = new ArrayList<>(bp.length);
-
- for (int p : bp) {
- Set set = ca.entrySet(p);
-
- backupPartitions.add(new IgnitePair<>(p, set != null ? set.size() : 0));
- }
}
}
size = ca.size();
nearSize = ca.nearSize();
-
dynamicDeploymentId = ca.context().dynamicDeploymentId();
dhtSize = size - nearSize;
primarySize = ca.primarySize();
@@ -401,14 +353,14 @@ public class VisorCache implements Serializable {
}
/**
- * @return Primary partitions IDs with sizes.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<IgnitePair<Integer>> primaryPartitions() {
return primaryPartitions;
}
/**
- * @return Backup partitions IDs with sizes.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<IgnitePair<Integer>> backupPartitions() {
return backupPartitions;
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
new file mode 100644
index 0000000..5909c1a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartition.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about keys in cache partition.
+ */
+public class VisorCachePartition implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int part;
+
+ /** */
+ private int heap;
+
+ /** */
+ private long offheap;
+
+ /** */
+ private long swap;
+
+ /**
+ * Full constructor.
+ *
+ * @param part Partition id.
+ * @param heap Number of keys in heap.
+ * @param offheap Number of keys in offheap.
+ * @param swap Number of keys in swap.
+ */
+ public VisorCachePartition(int part, int heap, long offheap, long swap) {
+ this.part = part;
+ this.heap = heap;
+ this.offheap = offheap;
+ this.swap = swap;
+ }
+
+ /**
+ * @return Partition id.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
+ * @return Number of keys in heap.
+ */
+ public int heap() {
+ return heap;
+ }
+
+ /**
+ * @return Number of keys in offheap.
+ */
+ public long offheap() {
+ return offheap;
+ }
+
+ /**
+ * @return Number of keys in swap.
+ */
+ public long swap() {
+ return swap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorCachePartition.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
new file mode 100644
index 0000000..4634fa6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data transfer object for information about cache partitions.
+ */
+public class VisorCachePartitions implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private List<VisorCachePartition> primary;
+
+ /** */
+ private List<VisorCachePartition> backup;
+
+ /**
+ * Default constructor.
+ */
+ public VisorCachePartitions() {
+ primary = new ArrayList<>();
+ backup = new ArrayList<>();
+ }
+
+ /**
+ * Add primary partition descriptor.
+ *
+ * @param part Partition id.
+ * @param heap Number of primary keys in heap.
+ * @param offheap Number of primary keys in offheap.
+ * @param swap Number of primary keys in swap.
+ */
+ public void addPrimary(int part, int heap, long offheap, long swap) {
+ primary.add(new VisorCachePartition(part, heap, offheap, swap));
+ }
+
+ /**
+ * Add backup partition descriptor.
+ *
+ * @param part Partition id.
+ * @param heap Number of backup keys in heap.
+ * @param offheap Number of backup keys in offheap.
+ * @param swap Number of backup keys in swap.
+ */
+ public void addBackup(int part, int heap, long offheap, long swap) {
+ backup.add(new VisorCachePartition(part, heap, offheap, swap));
+ }
+
+ /**
+ * @return Get list of primary partitions.
+ */
+ public List<VisorCachePartition> primary() {
+ return primary;
+ }
+
+ /**
+ * @return Get list of backup partitions.
+ */
+ public List<VisorCachePartition> backup() {
+ return backup;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorCachePartitions.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
new file mode 100644
index 0000000..80836c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.escapeName;
+
+/**
+ * Task that collect keys distribution in partitions.
+ */
+@GridInternal
+public class VisorCachePartitionsTask extends VisorMultiNodeTask<String, Map<UUID, VisorCachePartitions>, VisorCachePartitions> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorCachePartitionsJob job(String arg) {
+ return new VisorCachePartitionsJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected Map<UUID, VisorCachePartitions> reduce0(List<ComputeJobResult> results) {
+ Map<UUID, VisorCachePartitions> parts = new HashMap<>();
+
+ for (ComputeJobResult res : results) {
+ if (res.getException() != null)
+ throw res.getException();
+
+ parts.put(res.getNode().id(), (VisorCachePartitions)res.getData());
+ }
+
+ return parts;
+ }
+
+ /**
+ * Job that collect cache metrics from node.
+ */
+ private static class VisorCachePartitionsJob extends VisorJob<String, VisorCachePartitions> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create job with given argument.
+ *
+ * @param cacheName Cache name.
+ * @param debug Debug flag.
+ */
+ private VisorCachePartitionsJob(String cacheName, boolean debug) {
+ super(cacheName, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected VisorCachePartitions run(final String cacheName) throws IgniteException {
+ if (debug)
+ log(ignite.log(), "Collecting partitions for cache: " + escapeName(cacheName));
+
+ VisorCachePartitions parts = new VisorCachePartitions();
+
+ GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
+
+ // Cache was not started.
+ if (ca == null || !ca.context().started())
+ return parts;
+
+ CacheConfiguration cfg = ca.configuration();
+
+ CacheMode mode = cfg.getCacheMode();
+
+ boolean partitioned = (mode == CacheMode.PARTITIONED || mode == CacheMode.REPLICATED)
+ && ca.context().affinityNode();
+
+ if (partitioned) {
+ GridCacheSwapManager swap = ca.context().swap();
+
+ GridDhtCacheAdapter dca = null;
+
+ if (ca instanceof GridNearCacheAdapter)
+ dca = ((GridNearCacheAdapter)ca).dht();
+ else if (ca instanceof GridDhtCacheAdapter)
+ dca = (GridDhtCacheAdapter)ca;
+
+ if (dca != null) {
+ GridDhtPartitionTopology top = dca.topology();
+
+ List<GridDhtLocalPartition> locParts = top.localPartitions();
+
+ try {
+ for (GridDhtLocalPartition part : locParts) {
+ int p = part.id();
+
+ int sz = part.size();
+
+ // Pass -1 as topology version in order not to wait for topology version.
+ if (part.primary(AffinityTopologyVersion.NONE))
+ parts.addPrimary(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+ else if (part.state() == GridDhtPartitionState.OWNING && part.backup(AffinityTopologyVersion.NONE))
+ parts.addBackup(p, sz, swap.offheapEntriesCount(p), swap.swapEntriesCount(p));
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to collect keys distribution in partitions", e);
+ }
+ }
+ }
+
+ return parts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorCachePartitionsJob.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fad73bf5/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
index bd9a3ce..fab37e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
@@ -17,90 +17,34 @@
package org.apache.ignite.internal.visor.cache;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
import org.apache.ignite.internal.util.lang.GridTuple3;
-import org.apache.ignite.internal.util.lang.IgnitePair;
/**
* Data transfer object for {@link IgniteCache}.
+ *
+ * @deprecated Needed only for backward compatibility.
*/
public class VisorCacheV3 extends VisorCacheV2 {
/** */
private static final long serialVersionUID = 0L;
- /** Primary partitions IDs with offheap and swap entries count. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<GridTuple3<Integer, Long, Long>> primaryPartsOffheapSwap;
- /** Backup partitions IDs with offheap and swap entries count. */
+ /** @deprecated Needed only for backward compatibility. */
private Collection<GridTuple3<Integer, Long, Long>> backupPartsOffheapSwap;
- /** {@inheritDoc} */
- @Override public VisorCache from(IgniteEx ignite, String cacheName, int sample) throws IgniteCheckedException {
- VisorCache c = super.from(ignite, cacheName, sample);
-
- if (c != null && c instanceof VisorCacheV3) {
- VisorCacheV3 cacheV3 = (VisorCacheV3)c;
-
- GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
-
- // Process only started caches.
- if (ca != null && ca.context().started()) {
- GridCacheSwapManager swap = ca.context().swap();
-
- cacheV3.primaryPartsOffheapSwap = new ArrayList<>(c.primaryPartitions().size());
-
- for (IgnitePair<Integer> part: c.primaryPartitions()) {
- int p = part.get1();
-
- cacheV3.primaryPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
- }
-
- cacheV3.backupPartsOffheapSwap = new ArrayList<>(c.backupPartitions().size());
-
- for (IgnitePair<Integer> part: c.backupPartitions()) {
- int p = part.get1();
-
- cacheV3.backupPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
- }
- }
- }
-
- return c;
- }
-
- /** {@inheritDoc} */
- @Override protected VisorCache initHistory(VisorCache c) {
- super.initHistory(c);
-
- if (c instanceof VisorCacheV3) {
- ((VisorCacheV3)c).primaryPartsOffheapSwap = Collections.emptyList();
- ((VisorCacheV3)c).backupPartsOffheapSwap = Collections.emptyList();
- }
-
- return c;
- }
-
- /** {@inheritDoc} */
- @Override public VisorCache history() {
- return initHistory(new VisorCacheV3());
- }
-
/**
- * @return Collection with primary partitions IDs and offheap and swap entries count.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<GridTuple3<Integer, Long, Long>> primaryPartitionsOffheapSwap() {
return primaryPartsOffheapSwap;
}
/**
- * @return Collection with backup partitions IDs and offheap and swap entries count.
+ * @deprecated Needed only for backward compatibility.
*/
public Collection<GridTuple3<Integer, Long, Long>> backupPartitionsOffheapSwap() {
return backupPartsOffheapSwap;