You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/11/30 10:14:36 UTC
ignite git commit: IGNITE-6871 Implement new JMX metrics for
partitions map monitoring
Repository: ignite
Updated Branches:
refs/heads/master 35e621fec -> 6933f7b57
IGNITE-6871 Implement new JMX metrics for partitions map monitoring
Signed-off-by: Anton Vinogradov <av...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6933f7b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6933f7b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6933f7b5
Branch: refs/heads/master
Commit: 6933f7b57e15714c3fbf455ca205726eb706cd34
Parents: 35e621f
Author: Aleksey Plekhanov <Pl...@gmail.com>
Authored: Thu Nov 30 13:14:18 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Nov 30 13:14:18 2017 +0300
----------------------------------------------------------------------
.../processors/cache/CacheGroupContext.java | 13 +
.../cache/CacheGroupMetricsMXBeanImpl.java | 250 ++++++++++++++++++
.../processors/cache/GridCacheProcessor.java | 26 +-
.../ignite/mxbean/CacheGroupMetricsMXBean.java | 134 ++++++++++
.../cache/CacheGroupMetricsMBeanTest.java | 254 +++++++++++++++++++
.../IgniteCacheMetricsSelfTestSuite.java | 2 +
6 files changed, 678 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index ad4bbe3..6cd39ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -143,6 +144,9 @@ public class CacheGroupContext {
/** */
private boolean qryEnabled;
+ /** MXBean. */
+ private CacheGroupMetricsMXBean mxBean;
+
/**
* @param grpId Group ID.
* @param ctx Context.
@@ -193,6 +197,8 @@ public class CacheGroupContext {
log = ctx.kernalContext().log(getClass());
caches = new ArrayList<>();
+
+ mxBean = new CacheGroupMetricsMXBeanImpl(this);
}
/**
@@ -975,6 +981,13 @@ public class CacheGroupContext {
preldr.onReconnected();
}
+ /**
+ * @return MXBean.
+ */
+ public CacheGroupMetricsMXBean mxBean() {
+ return mxBean;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return "CacheGroupContext [grp=" + cacheOrGroupName() + ']';
http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
new file mode 100644
index 0000000..eb8e7ac
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+
+/**
+ * Management bean that provides access to {@link CacheGroupContext}.
+ */
+public class CacheGroupMetricsMXBeanImpl implements CacheGroupMetricsMXBean {
+ /** Cache group context. */
+ private final CacheGroupContext ctx;
+
+ /** Interface describing a predicate of two integers. */
+ private interface IntBiPredicate {
+ /**
+ * Predicate body.
+ *
+ * @param targetVal Target value.
+ * @param nextVal Next comparable value.
+ */
+ boolean apply(int targetVal, int nextVal);
+ }
+
+ /**
+ * Creates MBean;
+ *
+ * @param ctx Cache group context.
+ */
+ public CacheGroupMetricsMXBeanImpl(CacheGroupContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheGroupId() {
+ return ctx.groupId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<String> getCaches() {
+ List<String> caches = new ArrayList<>(ctx.caches().size());
+
+ for (GridCacheContext cache : ctx.caches())
+ caches.add(cache.name());
+
+ Collections.sort(caches);
+
+ return caches;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getBackups() {
+ return ctx.config().getBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getPartitions() {
+ return ctx.topology().partitions();
+ }
+
+ /**
+ * Calculates the number of partition copies for all partitions of this cache group and filter values by the
+ * predicate.
+ *
+ * @param pred Predicate.
+ */
+ private int numberOfPartitionCopies(IntBiPredicate pred) {
+ int parts = ctx.topology().partitions();
+
+ GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(false);
+
+ int res = -1;
+
+ for (int part = 0; part < parts; part++) {
+ int cnt = 0;
+
+ for (Map.Entry<UUID, GridDhtPartitionMap> entry : partFullMap.entrySet()) {
+ if (entry.getValue().get(part) == GridDhtPartitionState.OWNING)
+ cnt++;
+ }
+
+ if (part == 0 || pred.apply(res, cnt))
+ res = cnt;
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMinimumNumberOfPartitionCopies() {
+ return numberOfPartitionCopies(new IntBiPredicate() {
+ @Override public boolean apply(int targetVal, int nextVal) {
+ return nextVal < targetVal;
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMaximumNumberOfPartitionCopies() {
+ return numberOfPartitionCopies(new IntBiPredicate() {
+ @Override public boolean apply(int targetVal, int nextVal) {
+ return nextVal > targetVal;
+ }
+ });
+ }
+
+ /**
+ * Count of partitions with a given state on the node.
+ *
+ * @param nodeId Node id.
+ * @param state State.
+ */
+ private int nodePartitionsCountByState(UUID nodeId, GridDhtPartitionState state) {
+ int parts = ctx.topology().partitions();
+
+ GridDhtPartitionMap partMap = ctx.topology().partitionMap(false).get(nodeId);
+
+ int cnt = 0;
+
+ for (int part = 0; part < parts; part++)
+ if (partMap.get(part) == state)
+ cnt++;
+
+ return cnt;
+ }
+
+ /**
+ * Count of partitions with a given state in the entire cluster.
+ *
+ * @param state State.
+ */
+ private int clusterPartitionsCountByState(GridDhtPartitionState state) {
+ GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(true);
+
+ int cnt = 0;
+
+ for (UUID nodeId : partFullMap.keySet())
+ cnt += nodePartitionsCountByState(nodeId, state);
+
+ return cnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLocalNodeOwningPartitionsCount() {
+ return nodePartitionsCountByState(ctx.shared().localNodeId(), GridDhtPartitionState.OWNING);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLocalNodeMovingPartitionsCount() {
+ return nodePartitionsCountByState(ctx.shared().localNodeId(), GridDhtPartitionState.MOVING);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getClusterOwningPartitionsCount() {
+ return clusterPartitionsCountByState(GridDhtPartitionState.OWNING);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getClusterMovingPartitionsCount() {
+ return clusterPartitionsCountByState(GridDhtPartitionState.MOVING);
+ }
+
+ /**
+ * Gets partitions allocation map with a given state.
+ *
+ * @param state State.
+ * @return Partitions allocation map.
+ */
+ private Map<Integer, Set<String>> clusterPartitionsMapByState(GridDhtPartitionState state) {
+ int parts = ctx.topology().partitions();
+
+ GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(false);
+
+ Map<Integer, Set<String>> partsMap = new LinkedHashMap<>();
+
+ for (int part = 0; part < parts; part++) {
+ Set<String> partNodesSet = new HashSet<>();
+
+ for (Map.Entry<UUID, GridDhtPartitionMap> entry : partFullMap.entrySet()) {
+ if (entry.getValue().get(part) == state)
+ partNodesSet.add(entry.getKey().toString());
+ }
+
+ partsMap.put(part, partNodesSet);
+ }
+
+ return partsMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, Set<String>> getOwningPartitionsAllocationMap() {
+ return clusterPartitionsMapByState(GridDhtPartitionState.OWNING);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, Set<String>> getMovingPartitionsAllocationMap() {
+ return clusterPartitionsMapByState(GridDhtPartitionState.MOVING);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, List<String>> getAffinityPartitionsAssignmentMap() {
+ AffinityAssignment assignment = ctx.affinity().cachedAffinity(AffinityTopologyVersion.NONE);
+
+ int part = 0;
+
+ Map<Integer, List<String>> assignmentMap = new LinkedHashMap<>();
+
+ for (List<ClusterNode> partAssignment : assignment.assignment()) {
+ List<String> partNodeIds = new ArrayList<>(partAssignment.size());
+
+ for (ClusterNode node : partAssignment)
+ partNodeIds.add(node.id().toString());
+
+ assignmentMap.put(part, partNodeIds);
+
+ part++;
+ }
+
+ return assignmentMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a052150..569b40b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -87,9 +87,9 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
@@ -141,6 +141,7 @@ import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
import org.apache.ignite.mxbean.IgniteMBeanAware;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -220,6 +221,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Internal cache names. */
private final Set<String> internalCaches;
+ /** MBean group for cache group metrics */
+ private final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups";
+
/**
* @param ctx Kernal context.
*/
@@ -582,6 +586,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (Object obj : grp.configuredUserObjects())
cleanup(cfg, obj, false);
+
+ if (!grp.systemCache()) {
+ try {
+ ctx.config().getMBeanServer().unregisterMBean(U.makeMBeanName(ctx.igniteInstanceName(),
+ CACHE_GRP_METRICS_MBEAN_GRP, grp.cacheOrGroupName()));
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to unregister MBean for cache group: " + grp.name(), e);
+ }
+ }
}
/**
@@ -1909,6 +1923,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheGroupContext old = cacheGrps.put(desc.groupId(), grp);
+ if (!grp.systemCache()) {
+ try {
+ U.registerMBean(ctx.config().getMBeanServer(), ctx.igniteInstanceName(), CACHE_GRP_METRICS_MBEAN_GRP,
+ grp.cacheOrGroupName(), grp.mxBean(), CacheGroupMetricsMXBean.class);
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to register MBean for cache group: " + grp.name(), e);
+ }
+ }
+
assert old == null : old.name();
return grp;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
new file mode 100644
index 0000000..db548a3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+
+/**
+ * This interface defines JMX view on {@link CacheGroupContext}.
+ */
+@MXBeanDescription("MBean that provides access to cache group descriptor.")
+public interface CacheGroupMetricsMXBean {
+ /**
+ * Gets cache group id.
+ *
+ * @return Cache group id.
+ */
+ @MXBeanDescription("Cache group id.")
+ public int getCacheGroupId();
+
+ /**
+ * Gets list of cache names of this cache group.
+ *
+ * @return List of cache names.
+ */
+ @MXBeanDescription("List of caches.")
+ public List<String> getCaches();
+
+ /**
+ * Gets count of backups configured for this cache group.
+ *
+ * @return Count of backups.
+ */
+ @MXBeanDescription("Count of backups configured for cache group.")
+ public int getBackups();
+
+ /**
+ * Gets count of partitions for this cache group.
+ *
+ * @return Count of partitions.
+ */
+ @MXBeanDescription("Count of partitions for cache group.")
+ public int getPartitions();
+
+ /**
+ * Calculates minimum number of partitions copies for all partitions of this cache group.
+ *
+ * @return Minimum number of copies.
+ */
+ @MXBeanDescription("Minimum number of partition copies for all partitions of this cache group.")
+ public int getMinimumNumberOfPartitionCopies();
+
+ /**
+ * Calculates maximum number of partitions copies for all partitions of this cache group.
+ *
+ * @return Maximum number of copies.
+ */
+ @MXBeanDescription("Maximum number of partition copies for all partitions of this cache group.")
+ public int getMaximumNumberOfPartitionCopies();
+
+ /**
+ * Gets count of partitions with state OWNING for this cache group located on this node.
+ *
+ * @return Partitions count.
+ */
+ @MXBeanDescription("Count of partitions with state OWNING for this cache group located on this node.")
+ public int getLocalNodeOwningPartitionsCount();
+
+ /**
+ * Gets count of partitions with state MOVING for this cache group located on this node.
+ *
+ * @return Partitions count.
+ */
+ @MXBeanDescription("Count of partitions with state MOVING for this cache group located on this node.")
+ public int getLocalNodeMovingPartitionsCount();
+
+ /**
+ * Gets count of partitions with state OWNING for this cache group in the entire cluster.
+ *
+ * @return Partitions count.
+ */
+ @MXBeanDescription("Count of partitions for this cache group in the entire cluster with state OWNING.")
+ public int getClusterOwningPartitionsCount();
+
+ /**
+ * Gets count of partitions with state MOVING for this cache group in the entire cluster.
+ *
+ * @return Partitions count.
+ */
+ @MXBeanDescription("Count of partitions for this cache group in the entire cluster with state MOVING.")
+ public int getClusterMovingPartitionsCount();
+
+ /**
+ * Gets allocation map of partitions with state OWNING in the cluster.
+ *
+ * @return Map from partition number to set of nodes, where partition is located.
+ */
+ @MXBeanDescription("Allocation map of partitions with state OWNING in the cluster.")
+ public Map<Integer, Set<String>> getOwningPartitionsAllocationMap();
+
+ /**
+ * Gets allocation map of partitions with state MOVING in the cluster.
+ *
+ * @return Map from partition number to set of nodes, where partition is located
+ */
+ @MXBeanDescription("Allocation map of partitions with state MOVING in the cluster.")
+ public Map<Integer, Set<String>> getMovingPartitionsAllocationMap();
+
+ /**
+ * Gets affinity partitions assignment map.
+ *
+ * @return Map from partition number to list of nodes. The first node in this list is where the PRIMARY partition is
+ * assigned, other nodes in the list is where the BACKUP partitions is assigned.
+ */
+ @MXBeanDescription("Affinity partitions assignment map.")
+ public Map<Integer, List<String>> getAffinityPartitionsAssignmentMap();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java
new file mode 100644
index 0000000..4769e63
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Cache group JMX metrics test.
+ */
+public class CacheGroupMetricsMBeanTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ private static class RoundRobinVariableSizeAffinityFunction implements AffinityFunction {
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ return key.hashCode() % partitions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ List<List<ClusterNode>> assignmentParts = new ArrayList<>(partitions());
+
+ for (int part = 0; part < partitions(); part++) {
+ int backups = part % nodes.size() + 1;
+
+ List<ClusterNode> assignmentNodes = new ArrayList<>(backups);
+
+ for (int backup = 0; backup < backups; backup++)
+ assignmentNodes.add(nodes.get((part + part / nodes.size() + backup) % nodes.size()));
+
+ assignmentParts.add(assignmentNodes);
+ }
+
+ return assignmentParts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ // No-op
+ }
+
+ }
+
+ /**
+ * Partition assignment for cache1 with given affinity function:
+ *
+ * P/N 0 1 2
+ * ---------
+ * 0 | P
+ * 1 | P B
+ * 2 | B B P
+ * 3 | P
+ * 4 | B P
+ * 5 | P B B
+ * 6 | P
+ * 7 | P B
+ * 8 | B P B
+ * 9 | P
+ *
+ */
+ private static final int [][] assignmentMapArr =
+ new int[][] {{0}, {1, 2}, {2, 0, 1}, {1}, {2, 0}, {0, 1, 2}, {2}, {0, 1}, {1, 2, 0}, {0}};
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CacheConfiguration cCfg1 = new CacheConfiguration()
+ .setName("cache1")
+ .setGroupName("group1")
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(3)
+ .setAffinity(new RoundRobinVariableSizeAffinityFunction());
+
+ CacheConfiguration cCfg2 = new CacheConfiguration()
+ .setName("cache2")
+ .setGroupName("group2")
+ .setCacheMode(CacheMode.REPLICATED);
+
+ CacheConfiguration cCfg3 = new CacheConfiguration()
+ .setName("cache3")
+ .setGroupName("group2")
+ .setCacheMode(CacheMode.REPLICATED);
+
+ CacheConfiguration cCfg4 = new CacheConfiguration()
+ .setName("cache4")
+ .setCacheMode(CacheMode.PARTITIONED);
+
+ cfg.setCacheConfiguration(cCfg1, cCfg2, cCfg3, cCfg4);
+
+ return cfg;
+ }
+
+ /**
+ * Gets CacheGroupMetricsMXBean for given node and group name.
+ *
+ * @param nodeIdx Node index.
+ * @param cacheOrGrpName Cache group name.
+ * @return MBean instance.
+ */
+ private CacheGroupMetricsMXBean mxBean(int nodeIdx, String cacheOrGrpName) throws MalformedObjectNameException {
+ ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "Cache groups", cacheOrGrpName);
+
+ MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+ if (!mbeanSrv.isRegistered(mbeanName))
+ fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, CacheGroupMetricsMXBean.class,
+ true);
+ }
+
+ /**
+ * Converts array, containing partitions allocation to map from partitions to set of nodes.
+ *
+ * @param arr Array.
+ * @return Map from partitions to set of nodes.
+ */
+ private Map<Integer, Set<String>> arrayToAllocationMap(int[][] arr) {
+ Map<Integer, Set<String>> res = new LinkedHashMap<>();
+
+ for (int part = 0; part < arr.length; part++) {
+ Set<String> nodeSet = new HashSet<>();
+
+ if (arr[part] != null)
+ for (int node = 0; node < arr[part].length; node++)
+ nodeSet.add(grid(arr[part][node]).localNode().id().toString());
+
+ res.put(part, nodeSet);
+ }
+
+ return res;
+ }
+
+ /**
+ * Converts array, containing affinity assignment to map from partitions to list of nodes.
+ *
+ * @param arr Array.
+ * @return Map from partitions to list of nodes.
+ */
+ private Map<Integer, List<String>> arrayToAssignmentMap(int[][] arr) {
+ Map<Integer, List<String>> res = new LinkedHashMap<>();
+
+ for (int part = 0; part < arr.length; part++) {
+ List<String> nodeList = new ArrayList<>();
+
+ if (arr[part] != null)
+ for (int node = 0; node < arr[part].length; node++)
+ nodeList.add(grid(arr[part][node]).localNode().id().toString());
+
+ res.put(part, nodeList);
+ }
+
+ return res;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheGroupMetrics() throws Exception {
+ startGrid(0);
+ startGrid(1);
+ startGrid(2);
+
+ awaitPartitionMapExchange(true, false, null);
+
+ CacheGroupMetricsMXBean mxBean0Grp1 = mxBean(0, "group1");
+ CacheGroupMetricsMXBean mxBean0Grp2 = mxBean(0, "group2");
+ CacheGroupMetricsMXBean mxBean0Grp3 = mxBean(0, "cache4");
+ CacheGroupMetricsMXBean mxBean1Grp1 = mxBean(1, "group1");
+ CacheGroupMetricsMXBean mxBean2Grp1 = mxBean(2, "group1");
+
+ assertEquals(1, mxBean0Grp1.getMinimumNumberOfPartitionCopies());
+ assertEquals(3, mxBean0Grp1.getMaximumNumberOfPartitionCopies());
+
+ assertEquals(0, mxBean0Grp1.getClusterMovingPartitionsCount());
+ assertEquals(19, mxBean0Grp1.getClusterOwningPartitionsCount());
+
+ assertEquals(7, mxBean0Grp1.getLocalNodeOwningPartitionsCount());
+ assertEquals(6, mxBean1Grp1.getLocalNodeOwningPartitionsCount());
+ assertEquals(6, mxBean2Grp1.getLocalNodeOwningPartitionsCount());
+
+ assertEquals(F.asList("cache1"), mxBean0Grp1.getCaches());
+ assertEquals(F.asList("cache2", "cache3"), mxBean0Grp2.getCaches());
+ assertEquals(F.asList("cache4"), mxBean0Grp3.getCaches());
+
+ assertEquals(arrayToAssignmentMap(assignmentMapArr), mxBean0Grp1.getAffinityPartitionsAssignmentMap());
+ assertEquals(arrayToAllocationMap(assignmentMapArr), mxBean0Grp1.getOwningPartitionsAllocationMap());
+ assertEquals(arrayToAllocationMap(new int[10][]), mxBean0Grp1.getMovingPartitionsAllocationMap());
+
+ try (IgniteDataStreamer<Integer, Integer> st = grid(0).dataStreamer("cache1")) {
+ for (int i = 0; i < 50_000; i++)
+ st.addData(i, i);
+ }
+
+ stopGrid(2);
+
+ // Check moving partitions while rebalancing.
+ assertFalse(arrayToAllocationMap(new int[10][]).equals(mxBean0Grp1.getMovingPartitionsAllocationMap()));
+
+ assertTrue(mxBean0Grp1.getLocalNodeMovingPartitionsCount() > 0);
+ assertTrue(mxBean0Grp1.getClusterMovingPartitionsCount() > 0);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
index 6a2fcea..610c7cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.CacheGroupMetricsMBeanTest;
import org.apache.ignite.internal.processors.cache.CacheGroupsMetricsRebalanceTest;
import org.apache.ignite.internal.processors.cache.CacheMetricsForClusterGroupSelfTest;
import org.apache.ignite.internal.processors.cache.CacheValidatorMetricsTest;
@@ -63,6 +64,7 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite {
suite.addTestSuite(CacheGroupsMetricsRebalanceTest.class);
+ suite.addTestSuite(CacheGroupMetricsMBeanTest.class);
suite.addTestSuite(CacheValidatorMetricsTest.class);
// Cluster wide metrics.