You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/10/08 16:21:59 UTC

[ignite-3] branch ignite-3.0.0-alpha3 updated: IGNITE-15491 Added setBaseline method and naive rebalance. Fixes #379

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch ignite-3.0.0-alpha3
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-alpha3 by this push:
     new 1147ff3  IGNITE-15491 Added setBaseline method and naive rebalance. Fixes #379
1147ff3 is described below

commit 1147ff33e7f1daa941819548eb87e55d766b9e60
Author: Kirill Gusakov <kg...@gmail.com>
AuthorDate: Fri Oct 8 19:20:18 2021 +0300

    IGNITE-15491 Added setBaseline method and naive rebalance. Fixes #379
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
    (cherry picked from commit dd45a40733022ef342e45d0646e27f1b4c518588)
---
 .../src/main/java/org/apache/ignite/Ignite.java    |  21 ++
 .../ignite/internal/client/TcpIgniteClient.java    |   8 +
 .../org/apache/ignite/client/fakes/FakeIgnite.java |   8 +
 .../java/org/apache/ignite/internal/raft/Loza.java |  82 +++++++-
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  |  42 +++-
 .../internal/runner/app/ITBaselineChangesTest.java | 168 ++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   6 +
 .../apache/ignite/internal/table/TableImpl.java    |  12 ++
 .../internal/table/distributed/TableManager.java   | 212 ++++++++++++++++++++-
 .../distributed/storage/InternalTableImpl.java     |  10 +
 10 files changed, 560 insertions(+), 9 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/Ignite.java b/modules/api/src/main/java/org/apache/ignite/Ignite.java
index 7d3cbbd..1eda994 100644
--- a/modules/api/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/api/src/main/java/org/apache/ignite/Ignite.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite;
 
+import java.util.Set;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.ApiStatus.Experimental;
 
 /**
  * Ignite node interface. Main entry-point for all Ignite APIs.
@@ -44,4 +47,22 @@ public interface Ignite extends AutoCloseable {
      * @return Ignite transactions.
      */
     IgniteTransactions transactions();
+
+    /**
+     * Set new baseline nodes for table assignments.
+     *
+     * Current implementation has significant restrictions:
+     * - Only alive nodes can be a part of new baseline.
+     * If any passed nodes are not alive, {@link IgniteException} with appropriate message will be thrown.
+     * - Potentially it can be a long operation and current
+     * synchronous changePeers-based implementation can't handle this issue well.
+     * - No recovery logic supported, if setBaseline fails - it can produce random state of cluster.
+     *
+     * TODO: IGNITE-14209 issues above must be fixed.
+     *
+     * @param baselineNodes Names of baseline nodes.
+     * @throws IgniteException if nodes empty/null or any node is not alive.
+     */
+    @Experimental
+    void setBaseline(Set<String> baselineNodes);
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 0ffb12b..ed9bf1d 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.client;
 
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 
@@ -101,6 +102,13 @@ public class TcpIgniteClient implements IgniteClient {
         return null;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void setBaseline(Set<String> baselineNodes) {
+        throw new UnsupportedOperationException();
+    }
+
     /** {@inheritDoc} */
     @Override public void close() throws Exception {
         ch.close();
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 45a94f6..e46ac6d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.client.fakes;
 
+import java.util.Set;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.processors.query.calcite.QueryProcessor;
 import org.apache.ignite.table.manager.IgniteTables;
@@ -50,6 +51,13 @@ public class FakeIgnite implements Ignite {
         return null;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void setBaseline(Set<String> baselineNodes) {
+        throw new UnsupportedOperationException();
+    }
+
     /** {@inheritDoc} */
     @Override public void close() {
         // No-op.
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index df2ea1d..04427e5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.raft;
 
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.client.Peer;
@@ -38,11 +40,15 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
 import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.ApiStatus.Experimental;
 
 /**
  * Best raft manager ever since 1982.
  */
 public class Loza implements IgniteComponent {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteLogger.class);
+
     /** Factory. */
     private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
 
@@ -55,7 +61,12 @@ public class Loza implements IgniteComponent {
     private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
 
     /** Timeout. */
-    private static final int TIMEOUT = 1000;
+    // TODO: IGNITE-15705 Correct value should be investigated
+    private static final int TIMEOUT = 10000;
+
+    /** Network timeout. */
+    // TODO: IGNITE-15705 Correct value should be investigated
+    private static final int NETWORK_TIMEOUT = 3000;
 
     /** Retry delay. */
     private static final int DELAY = 200;
@@ -103,15 +114,23 @@ public class Loza implements IgniteComponent {
      * Creates a raft group service providing operations on a raft group.
      * If {@code nodes} contains the current node, then raft group starts on the current node.
      *
+     * IMPORTANT: DON'T USE. This method should be used only for long running changePeers requests - until
+     * IGNITE-14209 will be fixed with stable solution.
+     *
      * @param groupId Raft group id.
      * @param nodes Raft group nodes.
      * @param lsnrSupplier Raft group listener supplier.
+     * @param clientTimeout Client retry timeout.
+     * @param networkTimeout Client network timeout.
      * @return Future representing pending completion of the operation.
      */
+    @Experimental
     public CompletableFuture<RaftGroupService> prepareRaftGroup(
         String groupId,
         List<ClusterNode> nodes,
-        Supplier<RaftGroupListener> lsnrSupplier) {
+        Supplier<RaftGroupListener> lsnrSupplier,
+        int clientTimeout,
+        int networkTimeout) {
         assert !nodes.isEmpty();
 
         List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -125,7 +144,8 @@ public class Loza implements IgniteComponent {
             groupId,
             clusterNetSvc,
             FACTORY,
-            TIMEOUT,
+            clientTimeout,
+            networkTimeout,
             peers,
             true,
             DELAY,
@@ -134,6 +154,23 @@ public class Loza implements IgniteComponent {
     }
 
     /**
+     * Creates a raft group service providing operations on a raft group.
+     * If {@code nodes} contains the current node, then raft group starts on the current node.
+     *
+     * @param groupId Raft group id.
+     * @param nodes Raft group nodes.
+     * @param lsnrSupplier Raft group listener supplier.
+     * @return Future representing pending completion of the operation.
+     */
+    public CompletableFuture<RaftGroupService> prepareRaftGroup(
+        String groupId,
+        List<ClusterNode> nodes,
+        Supplier<RaftGroupListener> lsnrSupplier) {
+
+        return prepareRaftGroup(groupId, nodes, lsnrSupplier, TIMEOUT, NETWORK_TIMEOUT);
+    }
+
+    /**
      * Stops a raft group on the current node if {@code nodes} contains the current node.
      *
      * @param groupId Raft group id.
@@ -147,4 +184,43 @@ public class Loza implements IgniteComponent {
         if (nodes.stream().anyMatch(n -> locNodeName.equals(n.name())))
             raftServer.stopRaftGroup(groupId);
     }
+
+    /**
+     * Creates a raft group service providing operations on a raft group.
+     * If {@code deltaNodes} contains the current node, then raft group starts on the current node.
+     * @param groupId Raft group id.
+     * @param nodes Full set of raft group nodes.
+     * @param deltaNodes New raft group nodes.
+     * @param lsnrSupplier Raft group listener supplier.
+     * @return Future representing pending completion of the operation.
+     * @return
+     */
+    public CompletableFuture<RaftGroupService> updateRaftGroup(
+        String groupId,
+        Collection<ClusterNode> nodes,
+        Collection<ClusterNode> deltaNodes,
+        Supplier<RaftGroupListener> lsnrSupplier
+    ) {
+        assert !nodes.isEmpty();
+
+        List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
+
+        String locNodeName = clusterNetSvc.topologyService().localMember().name();
+
+        if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
+            if (!raftServer.startRaftGroup(groupId, lsnrSupplier.get(), peers))
+                LOG.error("Failed to start raft group on node " + locNodeName);
+        }
+
+        return RaftGroupServiceImpl.start(
+            groupId,
+            clusterNetSvc,
+            FACTORY,
+            TIMEOUT,
+            peers,
+            true,
+            DELAY,
+            executor
+        );
+    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 081f02d..4b847a0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -73,6 +73,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     /** */
     private volatile long timeout;
 
+    /** Timeout for network calls. */
+    private final long networkTimeout;
+
     /** */
     private final String groupId;
 
@@ -114,6 +117,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         ClusterService cluster,
         RaftMessagesFactory factory,
         int timeout,
+        int networkTimeout,
         List<Peer> peers,
         Peer leader,
         long retryDelay,
@@ -124,6 +128,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         this.learners = Collections.emptyList();
         this.factory = factory;
         this.timeout = timeout;
+        this.networkTimeout = networkTimeout;
         this.groupId = groupId;
         this.retryDelay = retryDelay;
         this.leader = leader;
@@ -137,6 +142,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
      * @param cluster Cluster service.
      * @param factory Message factory.
      * @param timeout Timeout.
+     * @param netTimeout Network call timeout.
      * @param peers List of all peers.
      * @param getLeader {@code True} to get the group's leader upon service creation.
      * @param retryDelay Retry delay.
@@ -148,12 +154,13 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         ClusterService cluster,
         RaftMessagesFactory factory,
         int timeout,
+        int netTimeout,
         List<Peer> peers,
         boolean getLeader,
         long retryDelay,
         ScheduledExecutorService executor
     ) {
-        var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, peers, null, retryDelay, executor);
+        var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, netTimeout, peers, null, retryDelay, executor);
 
         if (!getLeader)
             return CompletableFuture.completedFuture(service);
@@ -166,6 +173,32 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         });
     }
 
+    /**
+     * Starts raft group service.
+     *
+     * @param groupId Raft group id.
+     * @param cluster Cluster service.
+     * @param factory Message factory.
+     * @param timeout Timeout.
+     * @param peers List of all peers.
+     * @param getLeader {@code True} to get the group's leader upon service creation.
+     * @param retryDelay Retry delay.
+     * @param executor Executor for retrying requests.
+     * @return Future representing pending completion of the operation.
+     */
+    public static CompletableFuture<RaftGroupService> start(
+        String groupId,
+        ClusterService cluster,
+        RaftMessagesFactory factory,
+        int timeout,
+        List<Peer> peers,
+        boolean getLeader,
+        long retryDelay,
+        ScheduledExecutorService executor
+    ) {
+        return start(groupId, cluster, factory, timeout, timeout, peers, getLeader, retryDelay, executor);
+    }
+
     /** {@inheritDoc} */
     @Override public @NotNull String groupId() {
         return groupId;
@@ -456,7 +489,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
             return;
         }
 
-        CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, timeout);
+        CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, networkTimeout);
 
         fut0.whenComplete(new BiConsumer<Object, Throwable>() {
             @Override public void accept(Object resp, Throwable err) {
@@ -487,7 +520,10 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                             return null;
                         }, retryDelay, TimeUnit.MILLISECONDS);
                     }
-                    else if (resp0.errorCode() == RaftError.EPERM.getNumber()) {
+                    else if (resp0.errorCode() == RaftError.EPERM.getNumber() ||
+                        // TODO: IGNITE-15706
+                        resp0.errorCode() == RaftError.UNKNOWN.getNumber() ||
+                        resp0.errorCode() == RaftError.EINTERNAL.getNumber()) {
                         if (resp0.leaderId() == null) {
                             executor.schedule(() -> {
                                 sendWithRetry(randomNode(), req, stopTime, fut);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITBaselineChangesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITBaselineChangesTest.java
new file mode 100644
index 0000000..c4fd2f3
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITBaselineChangesTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.Lists;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test for baseline changes
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ITBaselineChangesTest {
+    /** Start network port for test nodes. */
+    private static final int BASE_PORT = 3344;
+
+    /** Nodes bootstrap configuration. */
+    private final Map<String, String> initClusterNodes = new LinkedHashMap<>();
+
+    /** */
+    private final List<Ignite> clusterNodes = new ArrayList<>();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** */
+    @BeforeEach
+    void setUp(TestInfo testInfo) {
+        String node0Name = testNodeName(testInfo, BASE_PORT);
+        String node1Name = testNodeName(testInfo, BASE_PORT + 1);
+        String node2Name = testNodeName(testInfo, BASE_PORT + 2);
+
+        initClusterNodes.put(
+            node0Name,
+            buildConfig(node0Name, 0)
+        );
+
+        initClusterNodes.put(
+            node1Name,
+            buildConfig(node0Name, 1)
+        );
+
+        initClusterNodes.put(
+            node2Name,
+            buildConfig(node0Name, 2)
+        );
+    }
+
+    /** */
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(Lists.reverse(clusterNodes));
+    }
+
+    /**
+     * Check dynamic table creation.
+     */
+    @Test
+    void testBaselineExtending(TestInfo testInfo) {
+        initClusterNodes.forEach((nodeName, configStr) ->
+            clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
+        );
+
+        assertEquals(3, clusterNodes.size());
+
+        // Create table on node 0.
+        TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
+            SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+            SchemaBuilders.column("val", ColumnType.INT32).asNullable().build()
+        ).withPrimaryKey("key").build();
+
+        clusterNodes.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
+            SchemaConfigurationConverter.convert(schTbl1, tblCh)
+                .changeReplicas(5)
+                .changePartitions(1)
+        );
+
+        // Put data on node 1.
+        Table tbl1 = clusterNodes.get(1).tables().table(schTbl1.canonicalName());
+        RecordView<Tuple> recView1 = tbl1.recordView();
+
+        recView1.insert(Tuple.create().set("key", 1L).set("val", 111));
+
+        var metaStoreNode = clusterNodes.get(0);
+
+        var node3Name = testNodeName(testInfo, nodePort(3));
+        var node4Name = testNodeName(testInfo, nodePort(4));
+
+        // Start 2 new nodes after
+        var node3 = IgnitionManager.start(
+            node3Name, buildConfig(metaStoreNode.name(), 3), workDir.resolve(node3Name));
+
+        clusterNodes.add(node3);
+
+        var node4 = IgnitionManager.start(
+            node4Name, buildConfig(metaStoreNode.name(), 4), workDir.resolve(node4Name));
+
+        clusterNodes.add(node4);
+
+        // Update baseline to nodes 1,4,5
+        metaStoreNode.setBaseline(Set.of(metaStoreNode.name(), node3Name, node4Name));
+
+        IgnitionManager.stop(clusterNodes.get(1).name());
+        IgnitionManager.stop(clusterNodes.get(2).name());
+
+        Table tbl4 = node4.tables().table(schTbl1.canonicalName());
+
+        final Tuple keyTuple1 = Tuple.create().set("key", 1L);
+
+        assertEquals(1, (Long) tbl4.recordView().get(keyTuple1).value("key"));
+    }
+
+    /** */
+    private String buildConfig(String metastoreNodeName, int nodeIdx) {
+        return "{\n" +
+            "  node.metastorageNodes: [ \"" + metastoreNodeName + "\" ],\n" +
+            "  network: {\n" +
+            "    port: " + nodePort(nodeIdx) + "\n" +
+            "    netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" +
+            "  }\n" +
+            "}";
+    }
+
+    /** */
+    private int nodePort(int nodeIdx) {
+        return BASE_PORT + nodeIdx;
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f68ab1e..113e9cd 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
@@ -343,6 +344,11 @@ public class IgniteImpl implements Ignite {
         return name;
     }
 
+    /** {@inheritDoc} */
+    @Override public void setBaseline(Set<String> baselineNodes) {
+        distributedTblMgr.setBaseline(baselineNodes);
+    }
+
     /**
      * @return Node configuration.
      */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 5247e34..22f9b9c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.table;
 
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.schema.definition.SchemaManagementMode;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
@@ -110,4 +112,14 @@ public class TableImpl implements Table {
     public void schemaMode(SchemaManagementMode schemaMode) {
         this.tbl.schema(schemaMode);
     }
+
+    /**
+     * Updates internal table raft group service for given partition.
+     *
+     * @param p Partition.
+     * @param raftGrpSvc Raft group service.
+     */
+    public void updateInternalTableRaftGroupService(int p, RaftGroupService raftGrpSvc) {
+        ((InternalTableImpl)tbl).updateInternalTableRaftGroupService(p, raftGrpSvc);
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 73d4bdc..a23c426 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,6 +34,9 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
@@ -81,6 +85,11 @@ import org.apache.ignite.lang.LoggerMessageHelper;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.manager.IgniteTables;
@@ -142,7 +151,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
     //TODO: IGNITE-15161 These should go into TableImpl instances.
     /** Instances of table storages that need to be stopped on component stop. */
-    private final Set<TableStorage> tableStorages = ConcurrentHashMap.newKeySet();
+    private final Map<IgniteUuid, TableStorage> tableStorages = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
@@ -232,6 +241,51 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         }
                     });
 
+                ((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
+                    listen(assignmentsCtx -> {
+                        List<List<ClusterNode>> oldAssignments =
+                            (List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
+
+                        List<List<ClusterNode>> newAssignments =
+                            (List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+                        CompletableFuture<?>[] futures = new CompletableFuture<?>[oldAssignments.size()];
+
+                        // TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes
+                        // TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will
+                        // TODO: be exact same amount of partitions and replicas for both old and new assignments
+                        for (int i = 0; i < oldAssignments.size(); i++) {
+                            final int p = i;
+
+                            List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
+                            List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
+
+                            var toAdd = new HashSet<>(newPartitionAssignment);
+                            var toRemove = new HashSet<>(oldPartitionAssignment);
+
+                            toAdd.removeAll(oldPartitionAssignment);
+                            toRemove.removeAll(newPartitionAssignment);
+
+                            // Create new raft nodes according to new assignments.
+                            futures[i] = raftMgr.updateRaftGroup(
+                                raftGroupName(tblId, p),
+                                newPartitionAssignment,
+                                toAdd,
+                                () -> new PartitionListener(tableStorages.get(tblId).getOrCreatePartition(p))
+                            )
+                                .thenAccept(
+                                    updatedRaftGroupService -> tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p, updatedRaftGroupService)
+                                ).thenRun(() -> raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+                                ).exceptionally(th -> {
+                                        LOG.error("Failed to update raft groups one the node", th);
+                                        return null;
+                                    }
+                                );
+                        }
+
+                        return CompletableFuture.allOf(futures);
+                    });
+
                 createTableLocally(
                     ctx.newValue().name(),
                     IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
@@ -275,7 +329,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
     /** {@inheritDoc} */
     @Override public void stop() {
-        for (TableStorage tableStorage : tableStorages) {
+        for (TableStorage tableStorage : tableStorages.values()) {
             try {
                 tableStorage.stop();
             }
@@ -334,7 +388,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         tableStorage.start();
 
-        tableStorages.add(tableStorage);
+        tableStorages.put(tblId, tableStorage);
 
         for (int p = 0; p < partitions; p++) {
             int partId = p;
@@ -871,4 +925,156 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     private boolean isTableConfigured(String name) {
         return tableNamesConfigured().contains(name);
     }
+
+    /**
+     * @see Ignite#setBaseline(Set)
+     */
+    public void setBaseline(Set<String> nodes) {
+        if (nodes == null || nodes.isEmpty())
+            throw new IgniteException("New baseline can't be null or empty");
+
+        var currClusterMembers = new HashSet<>(baselineMgr.nodes());
+
+        var currClusterMemberNames =
+            currClusterMembers.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+        for (String nodeName: nodes) {
+            if (!currClusterMemberNames.contains(nodeName))
+                throw new IgniteException("Node '" + nodeName + "' not in current network cluster membership. " +
+                    " Adding not alive nodes is not supported yet.");
+        }
+
+        var newBaseline = currClusterMembers
+            .stream().filter(n -> nodes.contains(n.name())).collect(Collectors.toSet());
+
+        updateAssignments(currClusterMembers);
+
+        if (!newBaseline.equals(currClusterMembers))
+            updateAssignments(newBaseline);
+    }
+
+    /**
+     * Update assignments for all current tables according to input nodes list.
+     * These approach has known issues {@link Ignite#setBaseline(Set)}.
+     *
+     * @param clusterNodes Set of nodes for assignment.
+     */
+    private void updateAssignments(Set<ClusterNode> clusterNodes) {
+        var setBaselineFut = new CompletableFuture<>();
+
+        var changePeersQueue = new ArrayList<Supplier<CompletableFuture<Void>>>();
+
+        tablesCfg.tables().change(
+            tbls -> {
+                changePeersQueue.clear();
+                for (int i = 0; i < tbls.size(); i++) {
+                    tbls.createOrUpdate(tbls.get(i).name(), changeX -> {
+                        ExtendedTableChange change = (ExtendedTableChange)changeX;
+                        byte[] currAssignments = change.assignments();
+
+                        List<List<ClusterNode>> recalculatedAssignments = AffinityUtils.calculateAssignments(
+                            clusterNodes,
+                            change.partitions(),
+                            change.replicas());
+
+                        if (!recalculatedAssignments.equals(ByteUtils.fromBytes(currAssignments))) {
+                            change.changeAssignments(ByteUtils.toBytes(recalculatedAssignments));
+                            changePeersQueue.add(() ->
+                                updateRaftTopology(
+                                    (List<List<ClusterNode>>)ByteUtils.fromBytes(currAssignments),
+                                    recalculatedAssignments,
+                                    IgniteUuid.fromString(change.id())));
+                        }
+                    });
+                }
+            }).thenCompose((v) -> {
+                CompletableFuture<?>[] changePeersFutures = new CompletableFuture<?>[changePeersQueue.size()];
+
+                int i = 0;
+                for (Supplier<CompletableFuture<Void>> task: changePeersQueue) {
+                    changePeersFutures[i++] = task.get();
+                }
+
+                return CompletableFuture.allOf(changePeersFutures);
+            }).whenComplete((res, th) -> {
+                if (th != null)
+                    setBaselineFut.completeExceptionally(th);
+                else
+                    setBaselineFut.complete(null);
+            });
+
+        setBaselineFut.join();
+    }
+
+    /**
+     * Update raft groups of table partitions to new peers list.
+     *
+     * @param oldAssignments Old assignment.
+     * @param newAssignments New assignment.
+     * @param tblId Table ID.
+     * @return Future, which completes, when update finished.
+     */
+    private CompletableFuture<Void> updateRaftTopology(
+        List<List<ClusterNode>> oldAssignments,
+        List<List<ClusterNode>> newAssignments,
+        IgniteUuid tblId) {
+        CompletableFuture<?>[] futures = new CompletableFuture<?>[oldAssignments.size()];
+
+        // TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes
+        // TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will
+        // TODO: be exact same amount of partitions and replicas for both old and new assignments
+        for (int i = 0; i < oldAssignments.size(); i++) {
+            final int p = i;
+
+            List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
+            List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
+
+            var toAdd = new HashSet<>(newPartitionAssignment);
+
+            toAdd.removeAll(oldPartitionAssignment);
+
+            futures[i] = raftMgr.prepareRaftGroup(
+                raftGroupName(tblId, p),
+                oldPartitionAssignment,
+                () -> new RaftGroupListener() {
+                    @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+
+                    }
+
+                    @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+
+                    }
+
+                    @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+
+                    }
+
+                    @Override public boolean onSnapshotLoad(Path path) {
+                        return false;
+                    }
+
+                    @Override public void onShutdown() {
+
+                    }
+                },
+                60000,
+                10000
+            )
+                .thenCompose(
+                    updatedRaftGroupService -> {
+                        return
+                            updatedRaftGroupService.
+                                changePeers(
+                                    newPartitionAssignment.stream().map(n -> new Peer(n.address())).collect(Collectors.toList()));
+                    }
+                ).exceptionally(th -> {
+                        LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p) +
+                            "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
+                        return null;
+                    }
+                );
+        }
+
+        return CompletableFuture.allOf(futures);
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index eab54c3..3bb5c15 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -374,6 +374,16 @@ public class InternalTableImpl implements InternalTable {
                 });
     }
 
+    /**
+     * Updates internal table raft group service for given partition.
+     *
+     * @param p Partition.
+     * @param raftGrpSvc Raft group service.
+     */
+    public void updateInternalTableRaftGroupService(int p, RaftGroupService raftGrpSvc) {
+        partitionMap.put(p, raftGrpSvc);
+    }
+
     /** Partition scan publisher. */
     private class PartitionScanPublisher implements Publisher<BinaryRow> {
         /** {@link Publisher<BinaryRow>} that relatively notifies about partition rows.  */