You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/13 14:50:26 UTC

[ignite-3] branch main updated: IGNITE-14237 Implemented affinity calculation based on events. Fixes #120

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 865a066  IGNITE-14237 Implemented affinity calculation based on events. Fixes #120
865a066 is described below

commit 865a066a7f431ef5455533f642a42862d40308b3
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Thu May 13 17:49:50 2021 +0300

    IGNITE-14237 Implemented affinity calculation based on events. Fixes #120
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../ignite/internal/affinity/AffinityManager.java  | 170 ++++++-------
 .../internal/affinity/event/AffinityEvent.java     |  31 +++
 .../affinity/event/AffinityEventParameters.java    |  64 +++++
 .../ignite/internal/baseline/BaselineManager.java  |   1 +
 .../internal/metastorage/MetaStorageManager.java   |  40 ++-
 .../apache/ignite/internal/app/IgnitionImpl.java   |   3 +-
 modules/table/pom.xml                              |  11 +-
 .../internal/table/distributed/TableManager.java   | 279 ++++++++++-----------
 8 files changed, 350 insertions(+), 249 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 4619b11..6572a43 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -18,14 +18,16 @@
 package org.apache.ignite.internal.affinity;
 
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
-import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.affinity.event.AffinityEvent;
+import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
 import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -36,120 +38,73 @@ import org.apache.ignite.metastorage.common.Key;
 import org.apache.ignite.metastorage.common.Operations;
 import org.apache.ignite.metastorage.common.WatchEvent;
 import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * Affinity manager is responsible for affinity function related logic including calculating affinity assignments.
  */
-public class AffinityManager {
+public class AffinityManager extends Producer<AffinityEvent, AffinityEventParameters> {
     /** The logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(AffinityManager.class);
 
-    /** Tables prefix for the metasorage. */
-    private static final String INTERNAL_PREFIX = "internal.tables.";
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.assignment.";
 
     /**
-     * MetaStorage manager in order to watch private distributed affinity specific configuration,
-     * cause ConfigurationManger handles only public configuration.
+     * MetaStorage manager in order to watch private distributed affinity specific configuration, cause
+     * ConfigurationManger handles only public configuration.
      */
     private final MetaStorageManager metaStorageMgr;
 
-    /** Configuration manager in order to handle and listen affinity specific configuration.*/
+    /** Configuration manager in order to handle and listen affinity specific configuration. */
     private final ConfigurationManager configurationMgr;
 
     /** Baseline manager. */
     private final BaselineManager baselineMgr;
 
     /** Vault manager. */
-    private final VaultManager vaultManager;
-
-    /** Affinity calculate subscription future. */
-    private CompletableFuture<Long> affinityCalculateSubscriptionFut = null;
+    private final VaultManager vaultMgr;
 
     /**
+     * Creates a new affinity manager.
+     *
      * @param configurationMgr Configuration module.
      * @param metaStorageMgr Meta storage service.
+     * @param baselineMgr Baseline manager.
+     * @param vaultMgr Vault manager.
      */
     public AffinityManager(
         ConfigurationManager configurationMgr,
         MetaStorageManager metaStorageMgr,
         BaselineManager baselineMgr,
-        VaultManager vaultManager
+        VaultManager vaultMgr
     ) {
         this.configurationMgr = configurationMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.baselineMgr = baselineMgr;
-        this.vaultManager = vaultManager;
-
-        String localNodeName = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
-            .name().value();
-
-        configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
-            .metastorageNodes().listen(ctx -> {
-                if (ctx.newValue() != null) {
-                    if (MetaStorageManager.hasMetastorageLocally(localNodeName, ctx.newValue()))
-                        subscribeToAssignmentCalculation();
-                    else
-                        unsubscribeFromAssignmentCalculation();
-                }
-            return CompletableFuture.completedFuture(null);
-        });
+        this.vaultMgr = vaultMgr;
 
-        String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
-            .metastorageNodes().value();
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String tabIdVal = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
 
-        if (MetaStorageManager.hasMetastorageLocally(localNodeName, metastorageMembers))
-            subscribeToAssignmentCalculation();
-    }
+                    UUID tblId = UUID.fromString(tabIdVal);
 
-    /**
-     * Subscribes to meta storage members update.
-     */
-    private void subscribeToAssignmentCalculation() {
-        assert affinityCalculateSubscriptionFut == null : "Affinity calculation already subscribed";
+                    if (evt.newEntry().value() == null) {
+                        assert evt.oldEntry().value() != null : "Previous assignment is unknown";
 
-        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
+                        List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+                            evt.oldEntry().value());
 
-        affinityCalculateSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
-            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
-                for (WatchEvent evt : events) {
-                    byte[] assignmentVal = evt.newEntry().value();
-
-                    if (assignmentVal != null && assignmentVal.length == 0) {
-                        String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
-
-                        String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
-
-                        UUID tblId = UUID.fromString(placeholderValue);
-
-                        try {
-                            String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
-
-                            int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
-                                .tables().get(name).partitions().value();
-                            int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
-                                .tables().get(name).replicas().value();
-
-                            var key = evt.newEntry().key();
-                            metaStorageMgr.invoke(
-                                Conditions.key(key).value().eq(assignmentVal),
-                                Operations.put(key, ByteUtils.toBytes(
-                                    RendezvousAffinityFunction.assignPartitions(
-                                        baselineMgr.nodes(),
-                                        partitions,
-                                        replicas,
-                                        false,
-                                        null
-                                    ))),
-                                Operations.noop());
-
-                            LOG.info("Affinity manager calculated assignment for the table [name={}, tblId={}]",
-                                name, tblId);
-                        }
-                        catch (InterruptedException | ExecutionException e) {
-                            LOG.error("Failed to initialize affinity [key={}]",
-                                evt.newEntry().key().toString(), e);
-                        }
+                        onEvent(AffinityEvent.REMOVED, new AffinityEventParameters(tblId, assignment), null);
+                    }
+                    else {
+                        List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+                            evt.newEntry().value());
+
+                        onEvent(AffinityEvent.CALCULATED, new AffinityEventParameters(tblId, assignment), null);
                     }
                 }
 
@@ -163,21 +118,50 @@ public class AffinityManager {
     }
 
     /**
-     * Unsubscribes a listener form the affinity calculation.
+     * Calculates an assignment for a table which was specified by id.
+     *
+     * @param tblId Table identifier.
+     * @return A future which will complete when the assignment is calculated.
      */
-    private void unsubscribeFromAssignmentCalculation() {
-        if (affinityCalculateSubscriptionFut == null)
-            return;
-
-        try {
-            Long subscriptionId = affinityCalculateSubscriptionFut.get();
+    public CompletableFuture<Boolean> calculateAssignments(UUID tblId) {
+        return vaultMgr
+            .get(ByteArray.fromString(INTERNAL_PREFIX + tblId))
+            .thenCompose(entry -> {
+                TableConfiguration tblConfig = configurationMgr
+                    .configurationRegistry()
+                    .getConfiguration(TablesConfiguration.KEY)
+                    .tables()
+                    .get(new String(entry.value(), StandardCharsets.UTF_8));
+
+                var key = new Key(INTERNAL_PREFIX + tblId);
+
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-14716 Need to support baseline changes.
+                return metaStorageMgr.invoke(
+                    Conditions.key(key).value().eq(null),
+                    Operations.put(key, ByteUtils.toBytes(
+                        RendezvousAffinityFunction.assignPartitions(
+                            baselineMgr.nodes(),
+                            tblConfig.partitions().value(),
+                            tblConfig.replicas().value(),
+                            false,
+                            null
+                        ))),
+                    Operations.noop());
+        });
+    }
 
-            metaStorageMgr.unregisterWatch(subscriptionId);
+    /**
+     * Removes an assignment for a table which was specified by id.
+     *
+     * @param tblId Table identifier.
+     * @return A future which will complete when assignment is removed.
+     */
+    public CompletableFuture<Boolean> removeAssignment(UUID tblId) {
+        var key = new Key(INTERNAL_PREFIX + tblId);
 
-            affinityCalculateSubscriptionFut = null;
-        }
-        catch (InterruptedException | ExecutionException e) {
-            LOG.error("Couldn't unsubscribe for Metastorage updates", e);
-        }
+        return metaStorageMgr.invoke(
+            Conditions.key(key).value().ne(null),
+            Operations.remove(key),
+            Operations.noop());
     }
 }
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEvent.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEvent.java
new file mode 100644
index 0000000..25d2b6e
--- /dev/null
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity.event;
+
+import org.apache.ignite.internal.manager.Event;
+
+/**
+ * Affinity management events.
+ */
+public enum AffinityEvent implements Event {
+    /** This event fires when affinity assignment is calculated. */
+    CALCULATED,
+
+    /** This event fires when affinity assignment is removed. */
+    REMOVED
+}
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEventParameters.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEventParameters.java
new file mode 100644
index 0000000..0bedd1a
--- /dev/null
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEventParameters.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity.event;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Affinity event parameters. There are properties which associate with a concrete affinity assignment.
+ */
+public class AffinityEventParameters implements EventParameters {
+    /** Table identifier. */
+    private final UUID tableId;
+
+    /** Table schema view. */
+    private final List<List<ClusterNode>> assignment;
+
+    /**
+     * @param tableId Table identifier.
+     * @param assignment Affinity assignment.
+     */
+    public AffinityEventParameters(
+        UUID tableId,
+        List<List<ClusterNode>> assignment
+    ) {
+        this.tableId = tableId;
+        this.assignment = assignment;
+    }
+
+    /**
+     * Get the table identifier.
+     *
+     * @return Table id.
+     */
+    public UUID tableId() {
+        return tableId;
+    }
+
+    /**
+     * Gets an affinity assignment.
+     *
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> assignment() {
+        return assignment;
+    }
+}
diff --git a/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java b/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
index be4d0fa..34e003d 100644
--- a/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
+++ b/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.network.ClusterService;
  * Baseline manager is responsible for handling baseline related logic.
  */
 // TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
+// TODO: https://issues.apache.org/jira/browse/IGNITE-14716 Adapt concept of baseline topology IEP-4.
 @SuppressWarnings({"FieldCanBeLocal", "unused"}) public class BaselineManager {
     /** Configuration manager in order to handle and listen baseline specific configuration.*/
     private final ConfigurationManager configurationMgr;
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 069560c..d6e15e8 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -130,7 +130,7 @@ public class MetaStorageManager {
         Predicate<ClusterNode> metaStorageNodesContainsLocPred =
             clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
 
-        if (hasMetastorageLocally(locNodeName, metastorageNodes)) {
+        if (hasMetastorage(locNodeName, metastorageNodes)) {
             this.metaStorageSvcFut = CompletableFuture.completedFuture(new MetaStorageServiceImpl(
                     raftMgr.startRaftGroup(
                         METASTORAGE_RAFT_GROUP_NAME,
@@ -509,24 +509,46 @@ public class MetaStorageManager {
     }
 
     /**
-     * Checks whether the local node hosts meta storage.
+     * Checks whether the given node hosts meta storage.
      *
-     * @param locNodeName Local node uniq name.
+     * @param nodeName Node unique name.
      * @param metastorageMembers Meta storage members names.
-     * @return True if the node has meta storage, false otherwise.
+     * @return {@code true} if the node has meta storage, {@code false} otherwise.
      */
-    public static boolean hasMetastorageLocally(String locNodeName, String[] metastorageMembers) {
-        boolean isLocNodeHasMetasorage = false;
+    public static boolean hasMetastorage(String nodeName, String[] metastorageMembers) {
+        boolean isNodeHasMetasorage = false;
 
         for (String name : metastorageMembers) {
-            if (name.equals(locNodeName)) {
-                isLocNodeHasMetasorage = true;
+            if (name.equals(nodeName)) {
+                isNodeHasMetasorage = true;
 
                 break;
             }
         }
 
-        return isLocNodeHasMetasorage;
+        return isNodeHasMetasorage;
+    }
+
+    /**
+     * Checks whether the local node hosts meta storage.
+     *
+     * @param configurationMgr Configuration manager.
+     * @return {@code true} if the node has meta storage, {@code false} otherwise.
+     */
+    public static boolean hasMetastorageLocally(ConfigurationManager configurationMgr) {
+        String locNodeName = configurationMgr
+            .configurationRegistry()
+            .getConfiguration(NodeConfiguration.KEY)
+            .name()
+            .value();
+
+        String[] metastorageMembers = configurationMgr
+            .configurationRegistry()
+            .getConfiguration(NodeConfiguration.KEY)
+            .metastorageNodes()
+            .value();
+
+        return hasMetastorage(locNodeName, metastorageMembers);
     }
 
     // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index deba6d5..b7e757a 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -156,7 +156,7 @@ public class IgnitionImpl implements Ignition {
         BaselineManager baselineMgr = new BaselineManager(configurationMgr, metaStorageMgr, clusterNetSvc);
 
         // Affinity manager startup.
-        new AffinityManager(configurationMgr, metaStorageMgr, baselineMgr, vaultMgr);
+        AffinityManager affinityMgr = new AffinityManager(configurationMgr, metaStorageMgr, baselineMgr, vaultMgr);
 
         SchemaManager schemaMgr = new SchemaManager(configurationMgr);
 
@@ -165,6 +165,7 @@ public class IgnitionImpl implements Ignition {
             configurationMgr,
             metaStorageMgr,
             schemaMgr,
+            affinityMgr,
             raftMgr,
             vaultMgr
         );
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index bf93ea1..9051426 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -55,6 +55,11 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-affinity</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-baseline</artifactId>
         </dependency>
 
@@ -80,12 +85,6 @@
 
         <!-- Test dependencies -->
         <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-affinity</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-engine</artifactId>
             <scope>test</scope>
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4d47435..a5c1cc0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -21,21 +21,21 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
-import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.internal.affinity.event.AffinityEvent;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.raft.Loza;
@@ -46,15 +46,11 @@ import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListene
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.metastorage.common.Conditions;
 import org.apache.ignite.metastorage.common.Key;
 import org.apache.ignite.metastorage.common.Operations;
-import org.apache.ignite.metastorage.common.WatchEvent;
-import org.apache.ignite.metastorage.common.WatchListener;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.table.Table;
@@ -77,116 +73,96 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Configuration manager. */
     private final ConfigurationManager configurationMgr;
 
-    /** Table creation subscription future. */
-    private CompletableFuture<Long> tableCreationSubscriptionFut;
+    /** Raft manmager. */
+    private final Loza raftMgr;
+
+    /** Schema manager. */
+    private final SchemaManager schemaMgr;
+
+    /** Affinity manager. */
+    private final AffinityManager affMgr;
 
     /** Tables. */
     private Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
-    /*
+    /**
+     * Creates a new table manager.
+     *
      * @param configurationMgr Configuration manager.
      * @param metaStorageMgr Meta storage manager.
-     * @param schemaManager Schema manager.
+     * @param schemaMgr Schema manager.
+     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
      * @param vaultManager Vault manager.
      */
     public TableManager(
         ConfigurationManager configurationMgr,
         MetaStorageManager metaStorageMgr,
-        SchemaManager schemaManager,
+        SchemaManager schemaMgr,
+        AffinityManager affMgr,
         Loza raftMgr,
         VaultManager vaultManager
     ) {
         this.configurationMgr = configurationMgr;
         this.metaStorageMgr = metaStorageMgr;
+        this.affMgr = affMgr;
+        this.raftMgr = raftMgr;
+        this.schemaMgr = schemaMgr;
 
-        String localNodeName = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
-            .name().value();
-
-        configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
-            .metastorageNodes().listen(ctx -> {
-            if (ctx.newValue() != null) {
-                if (MetaStorageManager.hasMetastorageLocally(localNodeName, ctx.newValue()))
-                    subscribeForTableCreation();
-                else
-                    unsubscribeForTableCreation();
-            }
-            return CompletableFuture.completedFuture(null);
-
-        });
-
-        String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
-            .metastorageNodes().value();
-
-        if (MetaStorageManager.hasMetastorageLocally(localNodeName, metastorageMembers))
-            subscribeForTableCreation();
-
-        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
-
-        tableCreationSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
-            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
-                for (WatchEvent evt : events) {
-                    String placeholderValue = evt.newEntry().key().toString().substring(tableInternalPrefix.length() - 1);
-
-                    String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + placeholderValue))
-                        .join().value(), StandardCharsets.UTF_8);
-
-                    UUID tblId = UUID.fromString(placeholderValue);
-
-                    if (evt.newEntry().value() == null) {
-                        assert evt.oldEntry().value() != null : "Previous assignment is unknown";
-
-                        List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
-                            evt.oldEntry().value());
-
-                        int partitions = assignment.size();
+        listenForTableChange();
+    }
 
-                        for (int p = 0; p < partitions; p++)
-                            raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
+    /**
+     * Creates local structures for a table.
+     *
+     * @param name Table name.
+     * @param tblId Table id.
+     * @param assignment Affinity assignment.
+     */
+    private void createTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
+        int partitions = assignment.size();
 
-                        TableImpl table = tables.get(name);
+        HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
 
-                        assert table != null : "There is no table with the name specified [name=" + name + ']';
+        for (int p = 0; p < partitions; p++) {
+            partitionMap.put(p, raftMgr.startRaftGroup(
+                raftGroupName(tblId, p),
+                assignment.get(p),
+                new PartitionCommandListener()
+            ));
+        }
 
-                        onEvent(TableEvent.DROP, new TableEventParameters(
-                            tblId,
-                            name,
-                            table.schemaView(),
-                            table.internalTable()
-                        ), null);
-                    }
-                    else if (evt.newEntry().value().length > 0) {
-                        List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
-                            evt.newEntry().value());
+        onEvent(TableEvent.CREATE, new TableEventParameters(
+            tblId,
+            name,
+            new TableSchemaViewImpl(tblId, schemaMgr),
+            new InternalTableImpl(tblId, partitionMap, partitions)
+        ), null);
+    }
 
-                        int partitions = assignment.size();
+    /**
+     * Drops local structures for a table.
+     *
+     * @param name Table name.
+     * @param tblId Table id.
+     * @param assignment Affinity assignment.
+     */
+    private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
+        int partitions = assignment.size();
 
-                        HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+        for (int p = 0; p < partitions; p++)
+            raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
 
-                        for (int p = 0; p < partitions; p++) {
-                            partitionMap.put(p, raftMgr.startRaftGroup(
-                                raftGroupName(tblId, p),
-                                assignment.get(p),
-                                new PartitionCommandListener()
-                            ));
-                        }
+        TableImpl table = tables.get(name);
 
-                        onEvent(TableEvent.CREATE, new TableEventParameters(
-                            tblId,
-                            name,
-                            new TableSchemaViewImpl(tblId, schemaManager),
-                            new InternalTableImpl(tblId, partitionMap, partitions)
-                        ), null);
-                    }
-                }
+        assert table != null : "There is no table with the name specified [name=" + name + ']';
 
-                return true;
-            }
-
-            @Override public void onError(@NotNull Throwable e) {
-                LOG.error("Metastorage listener issue", e);
-            }
-        });
+        onEvent(TableEvent.DROP, new TableEventParameters(
+            tblId,
+            name,
+            table.schemaView(),
+            table.internalTable()
+        ), null);
     }
 
     /**
@@ -201,14 +177,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     }
 
     /**
-     * Subscribes on table create.
+     * Listens on a drop or create table.
      */
-    private void subscribeForTableCreation() {
+    private void listenForTableChange() {
         //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
-        configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
-            .tables().listen(ctx -> {
-            Set<String> tablesToStart = ctx.newValue().namedListKeys() == null ?
-                Collections.EMPTY_SET : ctx.newValue().namedListKeys();
+        configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
+            Set<String> tablesToStart = (ctx.newValue() == null || ctx.newValue().namedListKeys() == null) ?
+                Collections.emptySet() : new HashSet<>(ctx.newValue().namedListKeys());
 
             tablesToStart.removeAll(ctx.oldValue().namedListKeys());
 
@@ -216,24 +191,46 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
             List<CompletableFuture<Boolean>> futs = new ArrayList<>();
 
+            boolean hasMetastorageLocally = MetaStorageManager.hasMetastorageLocally(configurationMgr);
+
             for (String tblName : tablesToStart) {
                 TableView tableView = ctx.newValue().get(tblName);
-                long update = 0;
-
-                UUID tblId = new UUID(revision, update);
-
-                var key = new Key(INTERNAL_PREFIX + tblId.toString());
-                futs.add(metaStorageMgr.invoke(
-                    Conditions.key(key).value().eq(null),
-                    Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                    Operations.noop()).thenCompose(res ->
-                    res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0])
-                        .thenApply(v -> true)
-                        : CompletableFuture.completedFuture(false)));
+
+                UUID tblId = new UUID(revision, 0L);
+
+                if (hasMetastorageLocally) {
+                    var key = new Key(INTERNAL_PREFIX + tblId.toString());
+
+                    futs.add(metaStorageMgr.invoke(
+                        Conditions.key(key).value().eq(null),
+                        Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
+                        Operations.noop()).thenCompose(res ->
+                        affMgr.calculateAssignments(tblId)));
+                }
+
+                affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
+                    if (!tblId.equals(parameters.tableId()))
+                        return false;
+
+                    if (e == null)
+                        createTableLocally(tblName, tblId, parameters.assignment());
+                    else {
+                        LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);
+
+                        onEvent(TableEvent.CREATE, new TableEventParameters(
+                            tblId,
+                            tblName,
+                            null,
+                            null
+                        ), e);
+                    }
+
+                    return true;
+                });
             }
 
-            Set<String> tablesToStop = ctx.oldValue().namedListKeys() == null ?
-                Collections.EMPTY_SET : ctx.oldValue().namedListKeys();
+            Set<String> tablesToStop = (ctx.oldValue() == null || ctx.oldValue().namedListKeys() == null) ?
+                Collections.emptySet() : new HashSet<>(ctx.oldValue().namedListKeys());
 
             tablesToStop.removeAll(ctx.newValue().namedListKeys());
 
@@ -242,37 +239,38 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 UUID tblId = t.internalTable().tableId();
 
-                var key = new Key(INTERNAL_PREFIX + "assignment." + tblId.toString());
-                futs.add(metaStorageMgr.invoke(
-                    Conditions.key(key).value().ne(null),
-                    Operations.remove(key),
-                    Operations.noop()).thenCompose(res ->
-                    res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString()))
-                        .thenApply(v -> true)
-                        : CompletableFuture.completedFuture(false)));
-            }
+                if (hasMetastorageLocally) {
+                    var key = new Key(INTERNAL_PREFIX + tblId.toString());
 
-            return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
-        });
-    }
+                    futs.add(affMgr.removeAssignment(tblId).thenCompose(res ->
+                        metaStorageMgr.invoke(Conditions.key(key).value().ne(null),
+                            Operations.remove(key),
+                            Operations.noop())));
+                }
 
-    /**
-     * Unsubscribe from table creation.
-     */
-    private void unsubscribeForTableCreation() {
-        if (tableCreationSubscriptionFut == null)
-            return;
+                affMgr.listen(AffinityEvent.REMOVED, (parameters, e) -> {
+                    if (!tblId.equals(parameters.tableId()))
+                        return false;
 
-        try {
-            Long subscriptionId = tableCreationSubscriptionFut.get();
+                    if (e == null)
+                        dropTableLocally(tblName, tblId, parameters.assignment());
+                    else {
+                        LOG.error("Failed to drop a table [name=" + tblName + ", id=" + tblId + ']', e);
 
-            metaStorageMgr.unregisterWatch(subscriptionId);
+                        onEvent(TableEvent.DROP, new TableEventParameters(
+                            tblId,
+                            tblName,
+                            null,
+                            null
+                        ), e);
+                    }
 
-            tableCreationSubscriptionFut = null;
-        }
-        catch (InterruptedException | ExecutionException e) {
-            LOG.error("Couldn't unsubscribe from table creation", e);
-        }
+                    return true;
+                });
+            }
+
+            return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
+        });
     }
 
     /** {@inheritDoc} */
@@ -327,10 +325,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             }
         });
 
-        configurationMgr.configurationRegistry()
-            .getConfiguration(TablesConfiguration.KEY).tables().change(change -> {
-            change.delete(name);
-        });
+        configurationMgr
+            .configurationRegistry()
+            .getConfiguration(TablesConfiguration.KEY)
+            .tables()
+            .change(change -> change.delete(name));
 
         dropTblFut.join();
     }