You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/04 09:47:12 UTC

[ignite-3] branch main updated: IGNITE-14238 Creating and destroying tables. Fixes #112

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 26051c4  IGNITE-14238 Creating and destroying tables. Fixes #112
26051c4 is described below

commit 26051c4bb693b17e959214b403de14e474b5d306
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue May 4 12:46:38 2021 +0300

    IGNITE-14238 Creating and destroying tables. Fixes #112
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../ignite/internal/affinity/AffinityManager.java  |   7 +-
 .../apache/ignite/table/manager/IgniteTables.java  |   7 +
 .../org/apache/ignite/internal/manager/Event.java  |  25 +++
 .../ignite/internal/manager/EventParameters.java   |  26 +++
 .../apache/ignite/internal/manager/Producer.java   |  69 ++++++
 .../apache/ignite/internal/util/ArrayUtils.java    |   5 -
 .../java/org/apache/ignite/internal/raft/Loza.java |  18 +-
 .../ignite/internal/table/InternalTable.java       |   8 +
 .../apache/ignite/internal/table/TableImpl.java    |  18 ++
 .../ignite/internal/table/TableSchemaViewImpl.java |  52 +++++
 .../internal/table/distributed/TableManager.java   | 239 +++++++++++++--------
 .../distributed/storage/InternalTableImpl.java     |  13 ++
 .../ignite/internal/table/event/TableEvent.java    |  31 +++
 .../internal/table/event/TableEventParameters.java |  95 ++++++++
 .../ignite/table/impl/DummyInternalTableImpl.java  |   6 +
 15 files changed, 522 insertions(+), 97 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 937af33..82df3f8 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -27,7 +27,6 @@ import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
@@ -134,7 +133,9 @@ public class AffinityManager {
         affinityCalculateSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
             @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
                 for (WatchEvent evt : events) {
-                    if (ArrayUtils.empty(evt.newEntry().value())) {
+                    byte[] assignmentVal = evt.newEntry().value();
+
+                    if (assignmentVal != null && assignmentVal.length == 0) {
                         String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
 
                         String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
@@ -150,7 +151,7 @@ public class AffinityManager {
                                 .tables().get(name).replicas().value();
 
                             metaStorageMgr.invoke(evt.newEntry().key(),
-                                Conditions.value().eq(evt.newEntry().value()),
+                                Conditions.value().eq(assignmentVal),
                                 Operations.put(ByteUtils.toBytes(
                                     RendezvousAffinityFunction.assignPartitions(
                                         baselineMgr.nodes(),
diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
index 24b20f6..780e9f1 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
@@ -40,6 +40,13 @@ public interface IgniteTables {
     Table createTable(String name, Consumer<TableChange> tableInitChange);
 
     /**
+     * Drops a table with the name specified.
+     *
+     * @param name Table name.
+     */
+    void dropTable(String name);
+
+    /**
      * Gets a list of all started tables.
      *
      * @return List of tables.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
new file mode 100644
index 0000000..c245b5b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+/**
+ * The event cas whcih is produced by event producer component.
+ * @see Producer#onEvent(Event, EventParameters, Exception)
+ */
+public interface Event {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
new file mode 100644
index 0000000..140047d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+/**
+ * Event parameters.
+ * This type passed to the event listener.
+ * @see Producer#onEvent(Event, EventParameters, Exception)
+ */
+public interface EventParameters {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
new file mode 100644
index 0000000..c198bba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiPredicate;
+
+/**
+ * Interface which can produce its events.
+ */
+public abstract class Producer<T extends Event, P extends EventParameters> {
+    /** All listeners. */
+    private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P, Exception>>> listeners = new ConcurrentHashMap<>();
+
+    /**
+     * Registers an event listener.
+     * When the event predicate returns true it would never invoke after,
+     * otherwise this predicate would receive an event again.
+     *
+     * @param evt Event.
+     * @param closure Closure.
+     */
+    public void listen(T evt, BiPredicate<P, Exception> closure) {
+        listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>())
+            .offer(closure);
+    }
+
+    /**
+     * Notifies every listener that subscribed before.
+     *
+     * @param evt Event type.
+     * @param params Event parameters.
+     * @param err Exception when it was happened, or {@code null} otherwise.
+     */
+    protected void onEvent(T evt, P params, Exception err) {
+        ConcurrentLinkedQueue<BiPredicate<P, Exception>> queue = listeners.get(evt);
+
+        if (queue == null)
+            return;
+
+        BiPredicate<P, Exception> closure;
+
+        Iterator<BiPredicate<P, Exception>> iter = queue.iterator();
+
+        while (iter.hasNext()) {
+            closure = iter.next();
+
+            if (closure.test(params, err))
+                iter.remove();
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index dcbb3d4..0457241 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -179,11 +179,6 @@ public final class ArrayUtils {
         }
     };
 
-    /** */
-    public static boolean empty(byte[] arr) {
-        return arr == null || arr.length == 0;
-    }
-
     /**
      * Stub.
      */
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 3d2ce2c..3444091 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -70,9 +70,10 @@ public class Loza {
      * @return A RAFT group client.
      */
     public RaftGroupService startRaftGroup(String groupId, List<ClusterNode> peers, RaftGroupCommandListener lsnr) {
-        assert peers.size() > 1;
+        assert !peers.isEmpty();
 
         //Now we are using only one node in a raft group.
+        //TODO: IGNITE-13885 Investigate jraft implementation for replication framework based on RAFT protocol.
         if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name()))
             raftServer.setListener(groupId, lsnr);
 
@@ -86,4 +87,19 @@ public class Loza {
             DELAY
         );
     }
+
+    /**
+     * Stops a RAFT group.
+     *
+     * @param groupId RAFT group id.
+     * @param peers Group peers.
+     */
+    public void stopRaftGroup(String groupId, List<ClusterNode> peers) {
+        assert !peers.isEmpty();
+
+        //Now we are using only one node in a raft group.
+        //TODO: IGNITE-13885 Investigate jraft implementation for replication framework based on RAFT protocol.
+        if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name()))
+            raftServer.clearListener(groupId);
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1bddfed..4134e33 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table;
 
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.jetbrains.annotations.NotNull;
@@ -28,6 +29,13 @@ import org.jetbrains.annotations.NotNull;
  */
 public interface InternalTable {
     /**
+     * Gets a table id.
+     *
+     * @return Table id as UUID.
+     */
+    @NotNull UUID tableId();
+
+    /**
      * Asynchronously gets a row with same key columns values as given one from the table.
      *
      * @param keyRow Row with key columns set.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index cc70462..3fd11de 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -57,6 +57,24 @@ public class TableImpl extends AbstractTableView implements Table {
         marsh = new TupleMarshallerImpl(schemaMgr);
     }
 
+    /**
+     * Gets an internal table associated with the table.
+     *
+     * @return Internal table.
+     */
+    public @NotNull InternalTable internalTable() {
+        return tbl;
+    }
+
+    /**
+     * Gets a schema view for the table.
+     *
+     * @return Schema view.
+     */
+    public TableSchemaView schemaView() {
+        return schemaMgr;
+    }
+
     /** {@inheritDoc} */
     @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
         return new RecordViewImpl<>(tbl, schemaMgr, recMapper);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
new file mode 100644
index 0000000..dd74868
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.UUID;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * Schema view implementation.
+ */
+public class TableSchemaViewImpl implements TableSchemaView {
+    /** Table identifier. */
+    private final UUID tableId;
+
+    /** Schema manager. */
+    private final SchemaManager schemaManager;
+
+    /**
+     * @param tableId Table identifier.
+     * @param schemaManager Schema manager.
+     */
+    public TableSchemaViewImpl(UUID tableId, SchemaManager schemaManager) {
+        this.tableId = tableId;
+        this.schemaManager = schemaManager;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SchemaDescriptor schema() {
+        return schemaManager.schema(tableId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SchemaDescriptor schema(int ver) {
+        return schemaManager.schema(tableId, ver);
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 86a04e4..1d65043 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -19,13 +19,16 @@ package org.apache.ignite.internal.table.distributed;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
 import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
@@ -33,15 +36,16 @@ import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.table.TableSchemaViewImpl;
 import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.table.event.TableEvent;
+import org.apache.ignite.internal.table.event.TableEventParameters;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
@@ -60,7 +64,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Table manager.
  */
-public class TableManager implements IgniteTables {
+public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables {
     /** The logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);
 
@@ -77,13 +81,14 @@ public class TableManager implements IgniteTables {
     private CompletableFuture<Long> tableCreationSubscriptionFut;
 
     /** Tables. */
-    private Map<String, Table> tables;
+    private Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
-    /**
+    /*
      * @param configurationMgr Configuration manager.
      * @param metaStorageMgr Meta storage manager.
      * @param schemaManager Schema manager.
      * @param raftMgr Raft manager.
+     * @param vaultManager Vault manager.
      */
     public TableManager(
         ConfigurationManager configurationMgr,
@@ -92,8 +97,6 @@ public class TableManager implements IgniteTables {
         Loza raftMgr,
         VaultManager vaultManager
     ) {
-        tables = new HashMap<>();
-
         this.configurationMgr = configurationMgr;
         this.metaStorageMgr = metaStorageMgr;
 
@@ -123,52 +126,57 @@ public class TableManager implements IgniteTables {
         tableCreationSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
             @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
                 for (WatchEvent evt : events) {
-                    if (!ArrayUtils.empty(evt.newEntry().value())) {
-                        String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
-
-                        String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
-
-                        UUID tblId = UUID.fromString(placeholderValue);
-
-                        try {
-                            String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
-
-                            int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
-                                .tables().get(name).partitions().value();
-
-                            List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
-                                evt.newEntry().value());
-
-                            HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
-
-                            for (int p = 0; p < partitions; p++) {
-                                partitionMap.put(p, raftMgr.startRaftGroup(
-                                    name + "_part_" + p,
-                                    assignment.get(p),
-                                    new PartitionCommandListener()
-                                ));
-                            }
-
-                            tables.put(name, new TableImpl(
-                                new InternalTableImpl(
-                                    tblId,
-                                    partitionMap,
-                                    partitions
-                                ),
-                                new TableSchemaView() {
-                                    @Override public SchemaDescriptor schema() {
-                                        return schemaManager.schema(tblId);
-                                    }
-
-                                    @Override public SchemaDescriptor schema(int ver) {
-                                        return schemaManager.schema(tblId, ver);
-                                    }
-                                }));
-                        }
-                        catch (InterruptedException | ExecutionException e) {
-                            LOG.error("Failed to start table [key={}]",
-                                evt.newEntry().key(), e);
+                    String placeholderValue = evt.newEntry().key().toString().substring(tableInternalPrefix.length() - 1);
+
+                    String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + placeholderValue))
+                        .join().value(), StandardCharsets.UTF_8);
+
+                    UUID tblId = UUID.fromString(placeholderValue);
+
+                    if (evt.newEntry().value() == null) {
+                        assert evt.oldEntry().value() != null : "Previous assignment is unknown";
+
+                        List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+                            evt.oldEntry().value());
+
+                        int partitions = assignment.size();
+
+                        for (int p = 0; p < partitions; p++)
+                            raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
+
+                        TableImpl table = tables.get(name);
+
+                        assert table != null : "There is no table with the name specified [name=" + name + ']';
+
+                        onEvent(TableEvent.DROP, new TableEventParameters(
+                            tblId,
+                            name,
+                            table.schemaView(),
+                            table.internalTable()
+                        ), null);
+                    }
+                    else if (evt.newEntry().value().length > 0) {
+                        List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+                            evt.newEntry().value());
+
+                        int partitions = assignment.size();
+
+                        HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+
+                        for (int p = 0; p < partitions; p++) {
+                            partitionMap.put(p, raftMgr.startRaftGroup(
+                                raftGroupName(tblId, p),
+                                assignment.get(p),
+                                new PartitionCommandListener()
+                            ));
                         }
+
+                        onEvent(TableEvent.CREATE, new TableEventParameters(
+                            tblId,
+                            name,
+                            new TableSchemaViewImpl(tblId, schemaManager),
+                            new InternalTableImpl(tblId, partitionMap, partitions)
+                        ), null);
                     }
                 }
 
@@ -182,6 +190,17 @@ public class TableManager implements IgniteTables {
     }
 
     /**
+     * Compounds a RAFT group unique name.
+     *
+     * @param tableId Table identifier.
+     * @param partition Muber of table partition.
+     * @return A RAFT group name.
+     */
+    @NotNull private String raftGroupName(UUID tableId, int partition) {
+        return tableId + "_part_" + partition;
+    }
+
+    /**
      * Checks whether the local node hosts Metastorage.
      *
      * @param localNodeName Local node uniq name.
@@ -208,39 +227,52 @@ public class TableManager implements IgniteTables {
         //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
         configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
             .tables().listen(ctx -> {
-            HashSet<String> tblNamesToStart = new HashSet<>(ctx.newValue().namedListKeys());
+            Set<String> tablesToStart = ctx.newValue().namedListKeys() == null ?
+                Collections.EMPTY_SET : ctx.newValue().namedListKeys();
+
+            tablesToStart.removeAll(ctx.oldValue().namedListKeys());
 
             long revision = ctx.storageRevision();
 
-            if (ctx.oldValue() != null)
-                tblNamesToStart.removeAll(ctx.oldValue().namedListKeys());
+            List<CompletableFuture<Boolean>> futs = new ArrayList<>();
 
-            for (String tblName : tblNamesToStart) {
+            for (String tblName : tablesToStart) {
                 TableView tableView = ctx.newValue().get(tblName);
                 long update = 0;
 
                 UUID tblId = new UUID(revision, update);
 
-                CompletableFuture<Boolean> fut = metaStorageMgr.invoke(
+                futs.add(metaStorageMgr.invoke(
                     new Key(INTERNAL_PREFIX + tblId.toString()),
                     Conditions.value().eq(null),
                     Operations.put(tableView.name().getBytes(StandardCharsets.UTF_8)),
-                    Operations.noop());
+                    Operations.noop()).thenCompose(res ->
+                    res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0])
+                        .thenApply(v -> true)
+                        : CompletableFuture.completedFuture(false)));
+            }
 
-                try {
-                    if (fut.get()) {
-                        metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]);
+            Set<String> tablesToStop = ctx.oldValue().namedListKeys() == null ?
+                Collections.EMPTY_SET : ctx.oldValue().namedListKeys();
 
-                        LOG.info("Table manager created a table [name={}, revision={}]",
-                            tableView.name(), revision);
-                    }
-                }
-                catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Table was not fully initialized [name={}, revision={}]",
-                        tableView.name(), revision, e);
-                }
+            tablesToStop.removeAll(ctx.newValue().namedListKeys());
+
+            for (String tblName : tablesToStop) {
+                TableImpl t = tables.get(tblName);
+
+                UUID tblId = t.internalTable().tableId();
+
+                futs.add(metaStorageMgr.invoke(
+                    new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()),
+                    Conditions.value().ne(null),
+                    Operations.remove(),
+                    Operations.noop()).thenCompose(res ->
+                    res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString()))
+                        .thenApply(v -> true)
+                        : CompletableFuture.completedFuture(false)));
             }
-            return CompletableFuture.completedFuture(null);
+
+            return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
         });
     }
 
@@ -265,31 +297,62 @@ public class TableManager implements IgniteTables {
 
     /** {@inheritDoc} */
     @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
+        CompletableFuture<Table> tblFut = new CompletableFuture<>();
+
+        listen(TableEvent.CREATE, (params, e) -> {
+            String tableName = params.tableName();
+
+            if (!name.equals(tableName))
+                return false;
+
+            if (e == null) {
+                tblFut.complete(tables.compute(tableName, (key, val) ->
+                    new TableImpl(params.internalTable(), params.tableSchemaView())));
+            }
+            else
+                tblFut.completeExceptionally(e);
+
+            return true;
+        });
+
         configurationMgr.configurationRegistry()
             .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
             change.create(name, tableInitChange));
 
-//        this.createTable("tbl1", change -> {
-//            change.initReplicas(2);
-//            change.initName("tbl1");
-//            change.initPartitions(1_000);
-//        });
+        return tblFut.join();
+    }
 
-        //TODO: IGNITE-14646 Support asynchronous table creation
-        Table tbl = null;
+    /** {@inheritDoc} */
+    @Override public void dropTable(String name) {
+        CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
 
-        while (tbl == null) {
-            try {
-                Thread.sleep(50);
+        listen(TableEvent.DROP, new BiPredicate<TableEventParameters, Exception>() {
+            @Override public boolean test(TableEventParameters params, Exception e) {
+                String tableName = params.tableName();
 
-                tbl = table(name);
-            }
-            catch (InterruptedException e) {
-                LOG.error("Waiting of creation of table was interrupted.", e);
+                if (!name.equals(tableName))
+                    return false;
+
+                if (e == null) {
+                    Table droppedTable = tables.remove(tableName);
+
+                    assert droppedTable != null;
+
+                    dropTblFut.complete(null);
+                }
+                else
+                    dropTblFut.completeExceptionally(e);
+
+                return true;
             }
-        }
+        });
+
+        configurationMgr.configurationRegistry()
+            .getConfiguration(TablesConfiguration.KEY).tables().change(change -> {
+            change.delete(name);
+        });
 
-        return tbl;
+        dropTblFut.join();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 2ff5e8b..c7dcfa3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -42,6 +42,9 @@ public class InternalTableImpl implements InternalTable {
     /** Partitions. */
     private int partitions;
 
+    /** Table identifier. */
+    private UUID tableId;
+
     /**
      * @param tableId Table id.
      * @param partMap Map partition id to raft group.
@@ -52,10 +55,20 @@ public class InternalTableImpl implements InternalTable {
         Map<Integer, RaftGroupService> partMap,
         int partitions
     ) {
+        this.tableId = tableId;
         this.partitionMap = partMap;
         this.partitions = partitions;
     }
 
+    /**
+     * Gets a table id.
+     *
+     * @return Table id as UUID.
+     */
+    @Override public @NotNull UUID tableId() {
+        return tableId;
+    }
+
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
         return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
new file mode 100644
index 0000000..f2a737e
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.event;
+
+import org.apache.ignite.internal.manager.Event;
+
+/**
+ * Table management events.
+ */
+public enum TableEvent implements Event {
+    /** This event is fired when a table was created. */
+    CREATE,
+
+    /** This event is fired when a table was dropped. */
+    DROP
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
new file mode 100644
index 0000000..853617d
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.event;
+
+import java.util.UUID;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableSchemaView;
+
+/**
+ * Table event parameters.
+ * There are properties which associate with a concrete table.
+ */
+public class TableEventParameters implements EventParameters {
+    /** Table identifier. */
+    private final UUID tableId;
+
+    /** Table name. */
+    private final String tableName;
+
+    /** Table schema view. */
+    private final TableSchemaView tableSchemaView;
+
+    /** Internal table. */
+    private final InternalTable internalTable;
+
+    /**
+     * @param tableId Table identifier.
+     * @param tableName Table name.
+     * @param tableSchemaView Table schema view.
+     * @param internalTable Internal table.
+     */
+    public TableEventParameters(
+        UUID tableId,
+        String tableName,
+        TableSchemaView tableSchemaView,
+        InternalTable internalTable
+    ) {
+        this.tableId = tableId;
+        this.tableName = tableName;
+        this.tableSchemaView = tableSchemaView;
+        this.internalTable = internalTable;
+    }
+
+    /**
+     * Get the table identifier.
+     *
+     * @return Table id.
+     */
+    public UUID tableId() {
+        return tableId;
+    }
+
+    /**
+     * Gets the table name.
+     *
+     * @return Table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * Gets a schema view for the table.
+     *
+     * @return Schema descriptor.
+     */
+    public TableSchemaView tableSchemaView() {
+        return tableSchemaView;
+    }
+
+    /**
+     * Gets an internal table associated with the table.
+     *
+     * @return Internal table.
+     */
+    public InternalTable internalTable() {
+        return internalTable;
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
index 5e9a352..23d46c4 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -77,6 +78,11 @@ public class DummyInternalTableImpl implements InternalTable {
     }
 
     /** {@inheritDoc} */
+    @Override public @NotNull UUID tableId() {
+        return UUID.randomUUID();
+    }
+
+    /** {@inheritDoc} */
     @Override public CompletableFuture<BinaryRow> get(@NotNull BinaryRow row) {
         assert row != null;