You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/29 12:36:18 UTC

[ignite] branch master updated: IGNITE-11598 Added ability to have different rebalance thread pool size on different nodes in cluster - Fixes #6357.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 73361b6  IGNITE-11598 Added ability to have different rebalance thread pool size on different nodes in cluster - Fixes #6357.
73361b6 is described below

commit 73361b6a09cc6f95bcbed694e8f5222b2cf1e7b8
Author: mstepachev <ma...@gmail.com>
AuthorDate: Fri Mar 29 15:24:18 2019 +0300

    IGNITE-11598 Added ability to have different rebalance thread pool size on different nodes in cluster - Fixes #6357.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../org/apache/ignite/internal/IgniteFeatures.java |  5 +-
 .../processors/cache/GridCacheProcessor.java       | 10 ++-
 .../dht/preloader/GridDhtPartitionDemander.java    | 20 +++--
 .../cache/CacheRebalanceConfigValidationTest.java  | 57 --------------
 .../RebalanceWithDifferentThreadPoolSizeTest.java  | 92 ++++++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java    |  4 +-
 6 files changed, 120 insertions(+), 68 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index 9c23222..98b08ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -40,7 +40,10 @@ public enum IgniteFeatures {
     DISTRIBUTED_METASTORAGE(2),
 
     /** Data paket compression. */
-    DATA_PACKET_COMPRESSION(3);
+    DATA_PACKET_COMPRESSION(3),
+
+    /** Support of different rebalance size for nodes.  */
+    DIFFERENT_REBALANCE_POOL_SIZE(4);
 
     /**
      * Unique feature identifier.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3c5d2df..e17847a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -67,6 +67,7 @@ import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteComponentType;
+import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -1100,11 +1101,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException if check failed.
      */
     private void checkConsistency() throws IgniteCheckedException {
-        for (ClusterNode n : ctx.discovery().remoteNodes()) {
+        Collection<ClusterNode> rmtNodes = ctx.discovery().remoteNodes();
+
+        boolean changeablePoolSize = IgniteFeatures.allNodesSupports(rmtNodes, IgniteFeatures.DIFFERENT_REBALANCE_POOL_SIZE);
+
+        for (ClusterNode n : rmtNodes) {
             if (Boolean.TRUE.equals(n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)))
                 continue;
 
-            checkRebalanceConfiguration(n);
+            if(!changeablePoolSize)
+                checkRebalanceConfiguration(n);
 
             checkTransactionConfiguration(n);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 61f1e06..93602db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -447,13 +449,20 @@ public class GridDhtPartitionDemander {
 
         final CacheConfiguration cfg = grp.config();
 
-        int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize();
+        int locStripes = ctx.gridConfig().getRebalanceThreadPoolSize();
 
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) {
             final ClusterNode node = e.getKey();
 
             GridDhtPartitionDemandMessage d = e.getValue();
 
+            int rmtStripes = Optional.ofNullable((Integer) node.attribute(IgniteNodeAttributes.ATTR_REBALANCE_POOL_SIZE))
+                .orElse(1);
+
+            int rmtTotalStripes = rmtStripes <= locStripes ? rmtStripes : locStripes;
+
+            int stripes = rmtTotalStripes;
+
             final IgniteDhtDemandedPartitionsMap parts;
             synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel cancellation.
                 if (fut.isDone())
@@ -462,12 +471,11 @@ public class GridDhtPartitionDemander {
                 parts = fut.remaining.get(node.id());
 
                 U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
-                        + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
-                        + ", topVer=" + fut.topologyVersion() + ", parallelism=" + totalStripes + "]");
+                    + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
+                    + ", topVer=" + fut.topologyVersion() + ", localParallelism=" + locStripes
+                    + ", rmtParallelism=" + rmtStripes + ", parallelism=" + rmtTotalStripes + "]");
             }
 
-            int stripes = totalStripes;
-
             final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new ArrayList<>(stripes);
             for (int i = 0; i < stripes; i++)
                 stripePartitions.add(new IgniteDhtDemandedPartitionsMap());
@@ -485,7 +493,7 @@ public class GridDhtPartitionDemander {
             for (int i = 0; it.hasNext(); i++)
                 stripePartitions.get(i % stripes).addFull(it.next());
 
-            for (int stripe = 0; stripe < totalStripes; stripe++) {
+            for (int stripe = 0; stripe < rmtTotalStripes; stripe++) {
                 if (!stripePartitions.get(stripe).isEmpty()) {
                     // Create copy of demand message with new striped partitions map.
                     final GridDhtPartitionDemandMessage demandMsg = d.withNewPartitionsMap(stripePartitions.get(stripe));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalanceConfigValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalanceConfigValidationTest.java
deleted file mode 100644
index 5338b53..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalanceConfigValidationTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Test;
-
-/**
- *
- */
-public class CacheRebalanceConfigValidationTest extends GridCommonAbstractTest {
-    /** Rebalance pool size. */
-    private int rebalancePoolSize;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        cfg.setRebalanceThreadPoolSize(rebalancePoolSize);
-
-        return cfg;
-    }
-
-    /**
-     * Checks that node is not allowed to join to cluster if has different value of {@link IgniteConfiguration#rebalanceThreadPoolSize}.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testParameterConsistency() throws Exception {
-        rebalancePoolSize = 2;
-
-        startGrid(0);
-
-        rebalancePoolSize = 1;
-
-        GridTestUtils.assertThrows(log, () -> startGrid(1), IgniteCheckedException.class, "Rebalance configuration mismatch");
-    }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceWithDifferentThreadPoolSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceWithDifferentThreadPoolSizeTest.java
new file mode 100644
index 0000000..623af7d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RebalanceWithDifferentThreadPoolSizeTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test rebalance with different thread pool size for nodes.
+ */
+public class RebalanceWithDifferentThreadPoolSizeTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Rebalance pool size. */
+    private int rebalancePoolSize;
+
+    /** Entries count. */
+    private static final int ENTRIES_COUNT = 3000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache1";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+        discoverySpi.setIpFinder(ipFinder);
+
+        cfg.setRebalanceThreadPoolSize(rebalancePoolSize);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+            .setRebalanceMode(CacheRebalanceMode.SYNC) // Wait reblance finish before node start
+            .setAffinity(new RendezvousAffinityFunction(false, 32)));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void shouldFinishRebalanceWhenConnectedNewNodeWithHigherThreadPoolSize() throws Exception {
+        rebalancePoolSize = 1;
+
+        IgniteEx ex = startGrid(0);
+
+        ex.cluster().active(true);
+
+        IgniteCache<Integer, Integer> cache = ex.cache(CACHE_NAME);
+        for (int i = 0; i < ENTRIES_COUNT; i++)
+            cache.put(i, i);
+
+        rebalancePoolSize = 2;
+
+        startGrid(1);
+
+        rebalancePoolSize = 5;
+
+        startGrid(2);
+
+        assertEquals(3, ex.context().discovery().aliveServerNodes().size());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 40b3f22..0525487 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRende
 import org.apache.ignite.internal.processors.affinity.GridHistoryAffinityAssignmentTest;
 import org.apache.ignite.internal.processors.affinity.GridHistoryAffinityAssignmentTestNoOptimization;
 import org.apache.ignite.internal.processors.cache.CacheLocalGetSerializationTest;
-import org.apache.ignite.internal.processors.cache.CacheRebalanceConfigValidationTest;
 import org.apache.ignite.internal.processors.cache.GridLocalIgniteSerializationTest;
 import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCacheTest;
@@ -58,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNam
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestoreTest;
+import org.apache.ignite.internal.processors.cache.RebalanceWithDifferentThreadPoolSizeTest;
 import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExchangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest;
 import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest;
@@ -206,7 +206,7 @@ import org.junit.runners.Suite;
 
     AtomicOperationsInTxTest.class,
 
-    CacheRebalanceConfigValidationTest.class,
+    RebalanceWithDifferentThreadPoolSizeTest.class,
 
     ListeningTestLoggerTest.class,