You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/03/30 09:17:13 UTC
[ignite] branch master updated: IGNITE-12788 Adds cluster fully
rebalanced state metric. (#7561)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 87d5526 IGNITE-12788 Adds cluster fully rebalanced state metric. (#7561)
87d5526 is described below
commit 87d552662408ed701171d7c21cced9141a323c63
Author: Petrov Mikhail <32...@users.noreply.github.com>
AuthorDate: Mon Mar 30 12:16:51 2020 +0300
IGNITE-12788 Adds cluster fully rebalanced state metric. (#7561)
---
.../cache/GridCachePartitionExchangeManager.java | 17 ++
.../preloader/GridDhtPartitionsExchangeFuture.java | 7 +-
.../processors/metric/GridMetricManager.java | 6 +
.../spi/discovery/ClusterRebalancedMetricTest.java | 187 +++++++++++++++++++++
.../ignite/testsuites/IgniteUtilSelfTestSuite.java | 2 +
5 files changed, 217 insertions(+), 2 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f5faab4..5ef4419 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -112,6 +112,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -167,11 +168,13 @@ import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVer
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.CLUSTER_METRICS;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_DURATION;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_DURATION_HISTOGRAM;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_METRICS;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_OPS_BLOCKED_DURATION;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_OPS_BLOCKED_DURATION_HISTOGRAM;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.REBALANCED;
/**
* Partition exchange manager.
@@ -287,6 +290,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Delay before rebalancing code is start executing after exchange completion. For tests only. */
private volatile long rebalanceDelay;
+ /** Metric that shows whether cluster is in fully rebalanced state. */
+ private volatile BooleanMetricImpl rebalanced;
+
/** Discovery listener. */
private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
@Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -490,6 +496,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
durationHistogram = mreg.findMetric(PME_DURATION_HISTOGRAM);
blockingDurationHistogram = mreg.findMetric(PME_OPS_BLOCKED_DURATION_HISTOGRAM);
+
+ MetricRegistry clusterReg = cctx.kernalContext().metric().registry(CLUSTER_METRICS);
+
+ rebalanced = clusterReg.booleanMetric(REBALANCED,
+ "True if the cluster has achieved fully rebalanced state. Note that an inactive cluster always has" +
+ " this metric in False regardless of the real partitions state.");
}
/**
@@ -2858,6 +2870,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return blockingDurationHistogram;
}
+ /** @return Metric that shows whether cluster is in fully rebalanced state. */
+ public BooleanMetricImpl clusterRebalancedMetric() {
+ return rebalanced;
+ }
+
/**
* Exchange future thread. All exchanges happen only by one thread and next
* exchange will not start until previous one completes.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 28bf6dc..7993d67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2392,9 +2392,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (super.onDone(res, err)) {
afterLsnrCompleteFut.onDone();
- if (err == null)
+ if (err == null) {
updateDurationHistogram(System.currentTimeMillis() - initTime);
+ cctx.exchange().clusterRebalancedMetric().value(rebalanced);
+ }
+
if (log.isInfoEnabled()) {
log.info("Completed partition exchange [localNode=" + cctx.localNodeId() +
", exchange=" + (log.isDebugEnabled() ? this : shortInfo()) + ", topVer=" + topologyVersion() + "]");
@@ -3640,7 +3643,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
GridDhtPartitionsFullMessage msg = createPartitionsMessage(true,
minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0);
- if (!cctx.affinity().rebalanceRequired())
+ if (!cctx.affinity().rebalanceRequired() && !deactivateCluster())
msg.rebalanced(true);
// Lost partition detection should be done after full message is prepared otherwise in case of IGNORE policy
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
index 56886c0..2ffa3e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
@@ -135,6 +135,9 @@ public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi> imp
/** Partition map exchange metrics prefix. */
public static final String PME_METRICS = "pme";
+ /** Cluster metrics prefix. */
+ public static final String CLUSTER_METRICS = "cluster";
+
/** Transaction metrics prefix. */
public static final String TX_METRICS = "tx";
@@ -177,6 +180,9 @@ public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi> imp
/** Histogram of blocking PME durations metric name. */
public static final String PME_OPS_BLOCKED_DURATION_HISTOGRAM = "CacheOperationsBlockedDurationHistogram";
+ /** Whether cluster is in fully rebalanced state metric name. */
+ public static final String REBALANCED = "Rebalanced";
+
/** JVM interface to memory consumption info */
private static final MemoryMXBean mem = ManagementFactory.getMemoryMXBean();
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/ClusterRebalancedMetricTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/ClusterRebalancedMetricTest.java
new file mode 100644
index 0000000..8c1755c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/ClusterRebalancedMetricTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.Ignition.allGrids;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.CLUSTER_METRICS;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.REBALANCED;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests {@link GridMetricManager#REBALANCED} metric.
+ */
+public class ClusterRebalancedMetricTest extends GridCommonAbstractTest {
+ /** Whether node starts with persistence enabled. */
+ private boolean persistenceEnabled;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("rawtypes")
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(persistenceEnabled)));
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setBackups(1));
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+ cfg.setClusterStateOnStart(INACTIVE);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids(true);
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Tests {@link GridMetricManager#REBALANCED} metric in case of in-memory cluster.
+ */
+ @Test
+ public void testInMemoryClusterRebalancedMetric() throws Exception {
+ checkClusterRebalancedMetric();
+ }
+
+ /**
+ * Tests {@link GridMetricManager#REBALANCED} metric in case of cluster with native persistence enabled.
+ */
+ @Test
+ public void testPersistenceClusterRebalancedMetric() throws Exception {
+ persistenceEnabled = true;
+
+ checkClusterRebalancedMetric();
+ }
+
+ /**
+ * Checks {@link GridMetricManager#REBALANCED} metric value.
+ */
+ private void checkClusterRebalancedMetric() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ assertMetric(false);
+
+ ignite.cluster().state(ACTIVE);
+
+ assertMetric(true);
+
+ ignite.cache(DEFAULT_CACHE_NAME).put("key", "val");
+
+ startClientGrid(1);
+
+ assertMetric(true);
+
+ TestRecordingCommunicationSpi spi = startGridWithRebalanceBlocked(2);
+
+ if (persistenceEnabled) {
+ assertMetric(true);
+
+ ignite.cluster().setBaselineTopology(ignite.cluster().forServers().nodes());
+ }
+
+ spi.waitForBlocked();
+
+ assertMetric(false);
+
+ spi.stopBlock();
+
+ assertMetric(true);
+
+ ignite.cluster().state(INACTIVE);
+
+ assertMetric(false);
+
+ ignite.cluster().state(ACTIVE);
+
+ assertMetric(true);
+
+ stopGrid(2);
+
+ assertMetric(true);
+ }
+
+ /**
+ * Checks that {@link GridMetricManager#REBALANCED} metric is set to {@code exp} on all cluster nodes.
+ */
+ private void assertMetric(boolean exp) throws Exception {
+ assertTrue(waitForCondition(() -> allGrids().stream().allMatch(ignite -> {
+ BooleanMetric rebalancedMetric = ((IgniteEx) ignite)
+ .context()
+ .metric()
+ .registry(CLUSTER_METRICS)
+ .findMetric(REBALANCED);
+
+ return exp == rebalancedMetric.value();
+ }), getTestTimeout()));
+ }
+
+ /**
+ * Starts node with blocked ability to demand {@link GridAbstractTest#DEFAULT_CACHE_NAME} partitions
+ * from other nodes.
+ *
+ * @param idx Index of the node to be started.
+ * @return Communication SPI instance of the node that was started.
+ */
+ private TestRecordingCommunicationSpi startGridWithRebalanceBlocked(int idx) throws Exception {
+ IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(idx));
+
+ TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) cfg.getCommunicationSpi();
+
+ spi.blockMessages((node, msg) -> {
+ if (!(msg instanceof GridDhtPartitionDemandMessage))
+ return false;
+
+ GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage) msg;
+
+ return CU.cacheId(DEFAULT_CACHE_NAME) == demandMsg.groupId();
+ });
+
+ startGrid(cfg);
+
+ return spi;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index c6d3705..d5de277 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -45,6 +45,7 @@ import org.apache.ignite.lang.GridByteArrayListSelfTest;
import org.apache.ignite.spi.discovery.ClusterMetricsSelfTest;
import org.apache.ignite.spi.discovery.ClusterMetricsSnapshotSerializeCompatibilityTest;
import org.apache.ignite.spi.discovery.ClusterMetricsSnapshotSerializeSelfTest;
+import org.apache.ignite.spi.discovery.ClusterRebalancedMetricTest;
import org.apache.ignite.thread.GridThreadPoolExecutorServiceSelfTest;
import org.apache.ignite.thread.GridThreadTest;
import org.apache.ignite.thread.IgniteThreadPoolSizeTest;
@@ -112,6 +113,7 @@ import org.junit.runners.Suite;
ClusterMetricsSnapshotSerializeSelfTest.class,
ClusterMetricsSnapshotSerializeCompatibilityTest.class,
ClusterMetricsSelfTest.class,
+ ClusterRebalancedMetricTest.class,
// Unsafe.
GridUnsafeMemorySelfTest.class,