You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2020/03/30 09:17:13 UTC

[ignite] branch master updated: IGNITE-12788 Adds cluster fully rebalanced state metric. (#7561)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 87d5526  IGNITE-12788 Adds cluster fully rebalanced state metric. (#7561)
87d5526 is described below

commit 87d552662408ed701171d7c21cced9141a323c63
Author: Petrov Mikhail <32...@users.noreply.github.com>
AuthorDate: Mon Mar 30 12:16:51 2020 +0300

    IGNITE-12788 Adds cluster fully rebalanced state metric. (#7561)
---
 .../cache/GridCachePartitionExchangeManager.java   |  17 ++
 .../preloader/GridDhtPartitionsExchangeFuture.java |   7 +-
 .../processors/metric/GridMetricManager.java       |   6 +
 .../spi/discovery/ClusterRebalancedMetricTest.java | 187 +++++++++++++++++++++
 .../ignite/testsuites/IgniteUtilSelfTestSuite.java |   2 +
 5 files changed, 217 insertions(+), 2 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f5faab4..5ef4419 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -112,6 +112,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl;
 import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
 import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -167,11 +168,13 @@ import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVer
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.CLUSTER_METRICS;
 import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_DURATION;
 import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_DURATION_HISTOGRAM;
 import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_METRICS;
 import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_OPS_BLOCKED_DURATION;
 import static org.apache.ignite.internal.processors.metric.GridMetricManager.PME_OPS_BLOCKED_DURATION_HISTOGRAM;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.REBALANCED;
 
 /**
  * Partition exchange manager.
@@ -287,6 +290,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** Delay before rebalancing code is start executing after exchange completion. For tests only. */
     private volatile long rebalanceDelay;
 
+    /** Metric that shows whether cluster is in fully rebalanced state. */
+    private volatile BooleanMetricImpl rebalanced;
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -490,6 +496,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         durationHistogram = mreg.findMetric(PME_DURATION_HISTOGRAM);
         blockingDurationHistogram = mreg.findMetric(PME_OPS_BLOCKED_DURATION_HISTOGRAM);
+
+        MetricRegistry clusterReg = cctx.kernalContext().metric().registry(CLUSTER_METRICS);
+
+        rebalanced = clusterReg.booleanMetric(REBALANCED,
+            "True if the cluster has achieved fully rebalanced state. Note that an inactive cluster always has" +
+            " this metric in False regardless of the real partitions state.");
     }
 
     /**
@@ -2858,6 +2870,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         return blockingDurationHistogram;
     }
 
+    /** @return Metric that shows whether cluster is in fully rebalanced state. */
+    public BooleanMetricImpl clusterRebalancedMetric() {
+        return rebalanced;
+    }
+
     /**
      * Exchange future thread. All exchanges happen only by one thread and next
      * exchange will not start until previous one completes.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 28bf6dc..7993d67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2392,9 +2392,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (super.onDone(res, err)) {
             afterLsnrCompleteFut.onDone();
 
-            if (err == null)
+            if (err == null) {
                 updateDurationHistogram(System.currentTimeMillis() - initTime);
 
+                cctx.exchange().clusterRebalancedMetric().value(rebalanced);
+            }
+
             if (log.isInfoEnabled()) {
                 log.info("Completed partition exchange [localNode=" + cctx.localNodeId() +
                     ", exchange=" + (log.isDebugEnabled() ? this : shortInfo()) + ", topVer=" + topologyVersion() + "]");
@@ -3640,7 +3643,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             GridDhtPartitionsFullMessage msg = createPartitionsMessage(true,
                 minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0);
 
-            if (!cctx.affinity().rebalanceRequired())
+            if (!cctx.affinity().rebalanceRequired() && !deactivateCluster())
                 msg.rebalanced(true);
 
             // Lost partition detection should be done after full message is prepared otherwise in case of IGNORE policy
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
index 56886c0..2ffa3e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
@@ -135,6 +135,9 @@ public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi> imp
     /** Partition map exchange metrics prefix. */
     public static final String PME_METRICS = "pme";
 
+    /** Cluster metrics prefix. */
+    public static final String CLUSTER_METRICS = "cluster";
+
     /** Transaction metrics prefix. */
     public static final String TX_METRICS = "tx";
 
@@ -177,6 +180,9 @@ public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi> imp
     /** Histogram of blocking PME durations metric name. */
     public static final String PME_OPS_BLOCKED_DURATION_HISTOGRAM = "CacheOperationsBlockedDurationHistogram";
 
+    /** Whether cluster is in fully rebalanced state metric name. */
+    public static final String REBALANCED = "Rebalanced";
+
     /** JVM interface to memory consumption info */
     private static final MemoryMXBean mem = ManagementFactory.getMemoryMXBean();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/ClusterRebalancedMetricTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/ClusterRebalancedMetricTest.java
new file mode 100644
index 0000000..8c1755c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/ClusterRebalancedMetricTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.Ignition.allGrids;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.CLUSTER_METRICS;
+import static org.apache.ignite.internal.processors.metric.GridMetricManager.REBALANCED;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests {@link GridMetricManager#REBALANCED} metric.
+ */
+public class ClusterRebalancedMetricTest extends GridCommonAbstractTest {
+    /** Whether node starts with persistence enabled. */
+    private boolean persistenceEnabled;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("rawtypes")
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistenceEnabled)));
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+            .setBackups(1));
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+        cfg.setClusterStateOnStart(INACTIVE);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids(true);
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Tests {@link GridMetricManager#REBALANCED} metric in case of in-memory cluster.
+     */
+    @Test
+    public void testInMemoryClusterRebalancedMetric() throws Exception {
+        checkClusterRebalancedMetric();
+    }
+
+    /**
+     * Tests {@link GridMetricManager#REBALANCED} metric in case of cluster with native persistence enabled.
+     */
+    @Test
+    public void testPersistenceClusterRebalancedMetric() throws Exception {
+        persistenceEnabled = true;
+
+        checkClusterRebalancedMetric();
+    }
+
+    /**
+     * Checks {@link GridMetricManager#REBALANCED} metric value.
+     */
+    private void checkClusterRebalancedMetric() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        assertMetric(false);
+
+        ignite.cluster().state(ACTIVE);
+
+        assertMetric(true);
+
+        ignite.cache(DEFAULT_CACHE_NAME).put("key", "val");
+
+        startClientGrid(1);
+
+        assertMetric(true);
+
+        TestRecordingCommunicationSpi spi = startGridWithRebalanceBlocked(2);
+
+        if (persistenceEnabled) {
+            assertMetric(true);
+
+            ignite.cluster().setBaselineTopology(ignite.cluster().forServers().nodes());
+        }
+
+        spi.waitForBlocked();
+
+        assertMetric(false);
+
+        spi.stopBlock();
+
+        assertMetric(true);
+
+        ignite.cluster().state(INACTIVE);
+
+        assertMetric(false);
+
+        ignite.cluster().state(ACTIVE);
+
+        assertMetric(true);
+
+        stopGrid(2);
+
+        assertMetric(true);
+    }
+
+    /**
+     * Checks that {@link GridMetricManager#REBALANCED} metric is set to {@code exp} on all cluster nodes.
+     */
+    private void assertMetric(boolean exp) throws Exception {
+        assertTrue(waitForCondition(() -> allGrids().stream().allMatch(ignite -> {
+            BooleanMetric rebalancedMetric = ((IgniteEx) ignite)
+                .context()
+                .metric()
+                .registry(CLUSTER_METRICS)
+                .findMetric(REBALANCED);
+
+            return exp == rebalancedMetric.value();
+        }), getTestTimeout()));
+    }
+
+    /**
+     * Starts node with blocked ability to demand {@link GridAbstractTest#DEFAULT_CACHE_NAME} partitions
+     * from other nodes.
+     *
+     * @param idx Index of the node to be started.
+     * @return Communication SPI instance of the node that was started.
+     */
+    private TestRecordingCommunicationSpi startGridWithRebalanceBlocked(int idx) throws Exception {
+        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(idx));
+
+        TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) cfg.getCommunicationSpi();
+
+        spi.blockMessages((node, msg) -> {
+            if (!(msg instanceof GridDhtPartitionDemandMessage))
+                return false;
+
+            GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage) msg;
+
+            return CU.cacheId(DEFAULT_CACHE_NAME) == demandMsg.groupId();
+        });
+
+        startGrid(cfg);
+
+        return spi;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index c6d3705..d5de277 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -45,6 +45,7 @@ import org.apache.ignite.lang.GridByteArrayListSelfTest;
 import org.apache.ignite.spi.discovery.ClusterMetricsSelfTest;
 import org.apache.ignite.spi.discovery.ClusterMetricsSnapshotSerializeCompatibilityTest;
 import org.apache.ignite.spi.discovery.ClusterMetricsSnapshotSerializeSelfTest;
+import org.apache.ignite.spi.discovery.ClusterRebalancedMetricTest;
 import org.apache.ignite.thread.GridThreadPoolExecutorServiceSelfTest;
 import org.apache.ignite.thread.GridThreadTest;
 import org.apache.ignite.thread.IgniteThreadPoolSizeTest;
@@ -112,6 +113,7 @@ import org.junit.runners.Suite;
     ClusterMetricsSnapshotSerializeSelfTest.class,
     ClusterMetricsSnapshotSerializeCompatibilityTest.class,
     ClusterMetricsSelfTest.class,
+    ClusterRebalancedMetricTest.class,
 
     // Unsafe.
     GridUnsafeMemorySelfTest.class,