You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/28 21:07:44 UTC

[ignite-3] branch main updated: IGNITE-14446 Added support of watch and put/get/scan operations to MetaStorageManager. Fixes #111

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 38e7683  IGNITE-14446 Added support of watch and put/get/scan operations to MetaStorageManager. Fixes #111
38e7683 is described below

commit 38e7683004712eb9cd233ef556a7f5bb57acf56f
Author: Kirill Gusakov <kg...@gmail.com>
AuthorDate: Thu Apr 29 00:07:12 2021 +0300

    IGNITE-14446 Added support of watch and put/get/scan operations to MetaStorageManager. Fixes #111
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../ignite/internal/affinity/AffinityManager.java  |   4 +-
 modules/metastorage/pom.xml                        |  23 ++
 .../internal/metastorage/MetaStorageManager.java   | 336 ++++++++++++++++++---
 .../metastorage/watch/AggregatedWatch.java         |  69 +++++
 .../internal/metastorage/watch/KeyCriterion.java   | 162 ++++++++++
 .../metastorage/watch/WatchAggregator.java         | 244 +++++++++++++++
 .../internal/metastorage/WatchAggregatorTest.java  | 131 ++++++++
 .../apache/ignite/internal/app/IgnitionImpl.java   | 112 ++++++-
 .../internal/table/distributed/TableManager.java   |   4 +-
 9 files changed, 1041 insertions(+), 44 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index d09094e..937af33 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -129,9 +129,9 @@ public class AffinityManager {
     private void subscribeToAssignmentCalculation() {
         assert affinityCalculateSubscriptionFut == null : "Affinity calculation already subscribed";
 
-        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
 
-        affinityCalculateSubscriptionFut = metaStorageMgr.registerWatch(new Key(tableInternalPrefix), new WatchListener() {
+        affinityCalculateSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
             @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
                 for (WatchEvent evt : events) {
                     if (ArrayUtils.empty(evt.newEntry().value())) {
diff --git a/modules/metastorage/pom.xml b/modules/metastorage/pom.xml
index b31a74c..6c3a3f8 100644
--- a/modules/metastorage/pom.xml
+++ b/modules/metastorage/pom.xml
@@ -57,5 +57,28 @@
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-api</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-vault</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index d56e51a..4e06338 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -17,11 +17,22 @@
 
 package org.apache.ignite.internal.metastorage;
 
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.metastorage.client.MetaStorageService;
 import org.apache.ignite.metastorage.common.Condition;
+import org.apache.ignite.metastorage.common.Cursor;
 import org.apache.ignite.metastorage.common.Entry;
 import org.apache.ignite.metastorage.common.Key;
 import org.apache.ignite.metastorage.common.Operation;
@@ -53,6 +64,28 @@ import org.jetbrains.annotations.Nullable;
     private MetaStorageService metaStorageSvc;
 
     /**
+     * Aggregator of multiple watches to deploy them as one batch.
+     *
+     * @see WatchAggregator
+     */
+    private final WatchAggregator watchAggregator;
+
+    /**
+     * Future which will be completed with {@link IgniteUuid},
+     * when aggregated watch will be successfully deployed.
+     * Can be resolved to {@link Optional#empty()} if no watch deployed at the moment.
+     */
+    private CompletableFuture<Optional<IgniteUuid>> deployFut;
+
+    /**
+     * If true - all new watches will be deployed immediately.
+     *
+     * If false - all new watches will be aggregated to one batch
+     * for further deploy by {@link MetaStorageManager#deployWatches()}
+     */
+    private boolean deployed;
+
+    /**
      * The constructor.
      *
      * @param vaultMgr Vault manager.
@@ -62,11 +95,15 @@ import org.jetbrains.annotations.Nullable;
     public MetaStorageManager(
         VaultManager vaultMgr,
         ClusterService clusterNetSvc,
-        Loza raftMgr
+        Loza raftMgr,
+        MetaStorageService metaStorageSvc
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterNetSvc = clusterNetSvc;
         this.raftMgr = raftMgr;
+        this.metaStorageSvc = metaStorageSvc;
+        watchAggregator = new WatchAggregator();
+        deployFut = new CompletableFuture<>();
 
         // TODO: IGNITE-14088: Uncomment and use real serializer factory
 //        Arrays.stream(MetaStorageMessageTypes.values()).forEach(
@@ -82,75 +119,296 @@ import org.jetbrains.annotations.Nullable;
     }
 
     /**
-     * Register subscription on meta storage updates for further deployment when DMS is ready.
+     * Deploy all registered watches.
+     */
+    public synchronized void deployWatches() {
+        try {
+            var watch = watchAggregator.watch(
+                vaultMgr.appliedRevision(),
+                this::storeEntries
+            );
+        if (watch.isEmpty())
+            deployFut.complete(Optional.empty());
+        else
+            metaStorageSvc.watch(
+                watch.get().keyCriterion().toRange().getKey(),
+                watch.get().keyCriterion().toRange().getValue(),
+                watch.get().revision(),
+                watch.get().listener()).thenAccept(id -> deployFut.complete(Optional.of(id))).join();
+        }
+        catch (IgniteInternalCheckedException e) {
+            throw new IgniteInternalException("Couldn't receive applied revision during deploy watches", e);
+        }
+
+        deployed = true;
+    }
+
+    /**
+     * Register watch listener by key.
      *
-     * @param key The target key. Couldn't be {@code null}.
+     * @param key The target key.
      * @param lsnr Listener which will be notified for each update.
      * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
-     * subscription.
+     * subscription
      */
-    public synchronized CompletableFuture<Long> registerWatch(@Nullable Key key, @NotNull WatchListener lsnr) {
-        // TODO: IGNITE-14446 Implement DMS manager with watch registry.
-        return null;
+    public synchronized CompletableFuture<Long> registerWatch(
+        @Nullable Key key,
+        @NotNull WatchListener lsnr
+    ) {
+        return waitForReDeploy(watchAggregator.add(key, lsnr));
     }
 
     /**
-     * Proxies the invocation to metastorage.
+     * Register watch listener by key prefix.
      *
-     * @param key The target key.
-     * @return Metastorage entry.
+     * @param key Prefix to listen.
+     * @param lsnr Listener which will be notified for each update.
+     * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
+     * subscription
      */
-    public synchronized CompletableFuture<Entry> get(@Nullable Key key) {
-        // TODO: IGNITE-14446 Implement DMS manager with watch registry.
-        return null;
+    public synchronized CompletableFuture<Long> registerWatchByPrefix(
+        @Nullable Key key,
+        @NotNull WatchListener lsnr
+    ) {
+        return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
     }
 
     /**
-     * Proxies the invocation to metastorage.
+     * Register watch listener by collection of keys.
      *
-     * @param key The target key.
-     * @param value The value to set.
-     * @return
+     * @param keys Collection listen.
+     * @param lsnr Listener which will be notified for each update.
+     * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
+     * subscription
      */
-    public CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
-        // TODO: IGNITE-14446 Implement DMS manager with watch registry.
-        return null;
+    public synchronized CompletableFuture<Long> registerWatch(
+        @NotNull Collection<Key> keys,
+        @NotNull WatchListener lsnr
+    ) {
+        return waitForReDeploy(watchAggregator.add(keys, lsnr));
     }
 
     /**
-     * Invokes a service operation for metastorage.
+     * Register watch listener by range of keys.
      *
-     * @param key Key in metastorage.
-     * @param condition Condition to process.
-     * @param success Success operation.
-     * @param failure Failure operation.
-     * @return Future which will complete when appropriate final operation would be invoked.
+     * @param from Start key of range.
+     * @param to End key of range (exclusively).
+     * @param lsnr Listener which will be notified for each update.
+     * @return future with id of registered watch.
+     */
+    public synchronized CompletableFuture<Long> registerWatch(
+        @NotNull Key from,
+        @NotNull Key to,
+        @NotNull WatchListener lsnr
+    ) {
+        return waitForReDeploy(watchAggregator.add(from, to, lsnr));
+    }
+
+    /**
+     * Unregister watch listener by id.
+     *
+     * @param id of watch to unregister.
+     * @return future, which will be completed when unregister finished.
+     */
+    public synchronized CompletableFuture<Void> unregisterWatch(long id) {
+        watchAggregator.cancel(id);
+        if (deployed)
+            return updateWatches().thenAccept(v -> {});
+        else
+            return deployFut.thenAccept(uuid -> {});
+    }
+
+    /**
+     * @see MetaStorageService#get(Key)
+     */
+    public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
+        return metaStorageSvc.get(key);
+    }
+
+    /**
+     * @see MetaStorageService#get(Key, long)
+     */
+    public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
+        return metaStorageSvc.get(key, revUpperBound);
+    }
+
+    /**
+     * @see MetaStorageService#getAll(Collection)
+     */
+    public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
+        return metaStorageSvc.getAll(keys);
+    }
+
+    /**
+     * @see MetaStorageService#getAll(Collection, long)
+     */
+    public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
+        return metaStorageSvc.getAll(keys, revUpperBound);
+    }
+
+    /**
+     * @see MetaStorageService#put(Key, byte[])
+     */
+    public @NotNull CompletableFuture<Void> put(@NotNull Key key, byte[] val) {
+        return metaStorageSvc.put(key, val);
+    }
+
+    /**
+     * @see MetaStorageService#getAndPut(Key, byte[])
      */
-    public CompletableFuture<Boolean> invoke(
+    public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, byte[] val) {
+        return metaStorageSvc.getAndPut(key, val);
+    }
+
+    /**
+     * @see MetaStorageService#putAll(Map)
+     */
+    public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
+        return metaStorageSvc.putAll(vals);
+    }
+
+    /**
+     * @see MetaStorageService#getAndPutAll(Map)
+     */
+    public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
+        return metaStorageSvc.getAndPutAll(vals);
+    }
+
+    /**
+     * @see MetaStorageService#remove(Key)
+     */
+    public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
+        return metaStorageSvc.remove(key);
+    }
+
+    /**
+     * @see MetaStorageService#getAndRemove(Key)
+     */
+    public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
+        return metaStorageSvc.getAndRemove(key);
+    }
+
+    /**
+     * @see MetaStorageService#removeAll(Collection)
+     */
+    public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
+        return metaStorageSvc.removeAll(keys);
+    }
+
+    /**
+     * @see MetaStorageService#getAndRemoveAll(Collection)
+     */
+    public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
+        return metaStorageSvc.getAndRemoveAll(keys);
+    }
+
+    /**
+     * @see MetaStorageService#invoke(Key, Condition, Operation, Operation)
+     */
+    public @NotNull CompletableFuture<Boolean> invoke(
         @NotNull Key key,
-        @NotNull Condition condition,
+        @NotNull Condition cond,
         @NotNull Operation success,
         @NotNull Operation failure
     ) {
-        // TODO: IGNITE-14446 Implement DMS manager with watch registry.
-        return null;
+        return metaStorageSvc.invoke(key, cond, success, failure);
     }
 
     /**
-     * Unregister subscription for the given identifier.
+     * @see MetaStorageService#getAndInvoke(Key, Condition, Operation, Operation)
+     */
+    public @NotNull CompletableFuture<Entry> getAndInvoke(
+        @NotNull Key key,
+        @NotNull Condition cond,
+        @NotNull Operation success,
+        @NotNull Operation failure
+    ) {
+        return metaStorageSvc.getAndInvoke(key, cond, success, failure);
+    }
+
+    /**
+     * @see MetaStorageService#range(Key, Key, long)
+     */
+    public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
+        return metaStorageSvc.range(keyFrom, keyTo, revUpperBound);
+    }
+
+    /**
+     * @see MetaStorageService#range(Key, Key)
+     */
+    public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
+        return metaStorageSvc.range(keyFrom, keyTo);
+    }
+
+    /**
+     * @see MetaStorageService#compact()
+     */
+    public @NotNull CompletableFuture<Void> compact() {
+        return metaStorageSvc.compact();
+    }
+
+    /**
+     * Stop current batch of consolidated watches and register new one from current {@link WatchAggregator}.
      *
-     * @param id Subscription identifier.
-     * @return Completed future in case of operation success. Couldn't be {@code null}.
+     * @return Ignite UUID of new consolidated watch.
      */
-    public synchronized CompletableFuture<Void> unregisterWatch(long id) {
-        // TODO: IGNITE-14446 Implement DMS manager with watch registry.
-        return null;
+    private CompletableFuture<Optional<IgniteUuid>> updateWatches() {
+        Long revision;
+        try {
+            revision = vaultMgr.appliedRevision();
+        }
+        catch (IgniteInternalCheckedException e) {
+            throw new IgniteInternalException("Couldn't receive applied revision during watch redeploy", e);
+        }
+
+        final var finalRevision = revision;
+
+        deployFut = deployFut
+            .thenCompose(idOpt -> idOpt.map(metaStorageSvc::stopWatch).orElse(CompletableFuture.completedFuture(null)))
+            .thenCompose(r -> {
+                var watch = watchAggregator.watch(finalRevision, this::storeEntries);
+
+                if (watch.isEmpty())
+                    return CompletableFuture.completedFuture(Optional.empty());
+                else
+                    return metaStorageSvc.watch(
+                        watch.get().keyCriterion().toRange().get1(),
+                        watch.get().keyCriterion().toRange().get2(),
+                        watch.get().revision(),
+                        watch.get().listener()).thenApply(Optional::of);
+            });
+
+        return deployFut;
     }
 
     /**
-     * Deploy all registered watches through{@code MetaStorageService.watch()}.
+     * Store entries with appropriate associated revision.
+     *
+     * @param entries to store.
+     * @param revision associated revision.
+     * @return future, which will be completed when store action finished.
      */
-    public synchronized void deployWatches() {
-        // TODO: IGNITE-14446 Implement DMS manager with watch registry.
+    private CompletableFuture<Void> storeEntries(Collection<IgniteBiTuple<Key, byte[]>> entries, long revision) {
+        try {
+            return vaultMgr.putAll(entries.stream().collect(
+                Collectors.toMap(
+                    e -> ByteArray.fromString(e.getKey().toString()),
+                    IgniteBiTuple::getValue)),
+                revision);
+        }
+        catch (IgniteInternalCheckedException e) {
+            throw new IgniteInternalException("Couldn't put entries with considered revision.", e);
+        }
+    }
+
+    /**
+     * @param id of watch to redeploy.
+     * @return future, which will be completed after redeploy finished.
+     */
+    private CompletableFuture<Long> waitForReDeploy(long id) {
+        if (deployed)
+            return updateWatches().thenApply(uid -> id);
+        else
+            return deployFut.thenApply(uid -> id);
     }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/AggregatedWatch.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/AggregatedWatch.java
new file mode 100644
index 0000000..754bf76
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/AggregatedWatch.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.watch;
+
+import org.apache.ignite.metastorage.common.WatchListener;
+
+/**
+ * Watch implementation with associated revision.
+ * Instance of this watch produced by {@link WatchAggregator}.
+ */
+public class AggregatedWatch {
+    /** Watch key criterion. */
+    private final KeyCriterion keyCriterion;
+
+    /** Aggregated watch listener. */
+    private final WatchListener lsnr;
+
+    /** Watch revision. */
+    private final long revision;
+
+    /**
+     * Creates the instance of aggregated watch.
+     *
+     * @param keyCriterion Aggregated key criterion.
+     * @param revision Aggregated revision.
+     * @param lsnr Aggregated listener.
+     */
+    public AggregatedWatch(KeyCriterion keyCriterion, long revision, WatchListener lsnr) {
+        this.keyCriterion = keyCriterion;
+        this.revision = revision;
+        this.lsnr = lsnr;
+    }
+
+    /**
+     * @return Key criterion.
+     */
+    public KeyCriterion keyCriterion() {
+        return keyCriterion;
+    }
+
+    /**
+     * @return Watch listener.
+     */
+    public WatchListener listener() {
+        return lsnr;
+    }
+
+    /**
+     * @return Revision.
+     */
+    public long revision() {
+        return revision;
+    }
+}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/KeyCriterion.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/KeyCriterion.java
new file mode 100644
index 0000000..654b872
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/KeyCriterion.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.watch;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.metastorage.common.Key;
+
+/**
+ * Filter for listen key's changes on metastore.
+ */
+public interface KeyCriterion {
+    /**
+     * Translates any type of key criterion to range of keys.
+     *
+     * @return Ignite tuple with first key as start of range and second as the end.
+     */
+    public IgniteBiTuple<Key, Key> toRange();
+
+    /**
+     * Check if this key criterion contains the key.
+     *
+     * @return true if criterion contains the key, false otherwise.
+     */
+    public boolean contains(Key key);
+
+    /**
+     * Simple criterion which contains exactly one key.
+     */
+    static class ExactCriterion implements KeyCriterion {
+        /** The key of criterion. */
+        private final Key key;
+
+        /**
+         * Creates the instance of exact criterion.
+         *
+         * @param key Instance of the reference key.
+         */
+        public ExactCriterion(Key key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteBiTuple<Key, Key> toRange() {
+            return new IgniteBiTuple<>(key, key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Key key) {
+            return this.key.equals(key);
+        }
+
+    }
+
+    /**
+     * Criterion which contains the range of keys.
+     */
+    static class RangeCriterion implements KeyCriterion {
+        /** Start of the range. */
+        private final Key from;
+
+        /** End of the range (exclusive). */
+        private final Key to;
+
+        /**
+         * Creates the instance of range criterion.
+         *
+         * @param from Start of the range.
+         * @param to End of the range (exclusive).
+         */
+        public RangeCriterion(Key from, Key to) {
+            this.from = from;
+            this.to = to;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteBiTuple<Key, Key> toRange() {
+            return new IgniteBiTuple<>(from, to);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Key key) {
+            return key.compareTo(from) >= 0 && key.compareTo(to) < 0;
+        }
+    }
+
+    /**
+     * Criterion which consists collection of keys.
+     */
+    static class CollectionCriterion implements KeyCriterion {
+        /** Collection of keys. */
+        private final Collection<Key> keys;
+
+        /**
+         * Creates the instance of collection criterion.
+         *
+         * @param keys Collection of keys.
+         */
+        public CollectionCriterion(Collection<Key> keys) {
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteBiTuple<Key, Key> toRange() {
+            return new IgniteBiTuple<>(Collections.min(keys), Collections.max(keys));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Key key) {
+            return keys.contains(key);
+        }
+    }
+
+    /**
+     * Criterion which consists of all keys with defined prefix.
+     */
+    static class PrefixCriterion implements KeyCriterion {
+        /** Prefix of the key. */
+        private final Key prefixKey;
+
+        /**
+         * Creates the instance of prefix key criterion.
+         *
+         * @param prefixKey Prefix of the key.
+         */
+        public PrefixCriterion(Key prefixKey) {
+            this.prefixKey = prefixKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteBiTuple<Key, Key> toRange() {
+            var bytes = Arrays.copyOf(prefixKey.bytes(), prefixKey.bytes().length);
+            if (bytes[bytes.length - 1] != Byte.MAX_VALUE)
+                bytes[bytes.length - 1]++;
+            else
+                bytes = Arrays.copyOf(bytes, bytes.length + 1);
+            return new IgniteBiTuple<>(prefixKey, new Key(bytes));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Key key) {
+            return key.compareTo(prefixKey) >= 0 && key.compareTo(toRange().getValue()) < 0;
+        }
+    }
+}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
new file mode 100644
index 0000000..35c8881
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.watch;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Needed to aggregate multiple watches to one aggregated watch.
+ * This approach needed to provide the following additional guarantees to watching mechanism:
+ * - watch events will be processed sequentially
+ * - watch events will be resolved in the order of watch registration
+ */
+public class WatchAggregator {
+    /**
+     * Watches' map must be synchronized because of changes from WatchListener in separate thread.
+     */
+    private final Map<Long, Watch> watches = Collections.synchronizedMap(new LinkedHashMap<>());
+
+    /** Simple auto increment id for internal watches. */
+    private AtomicLong idCntr = new AtomicLong(0);
+
+    /**
+     * Adds new watch with simple exact criterion.
+     *
+     * @param key Key for watching.
+     * @param lsnr Listener which will be executed on watch event.
+     * @return id of registered watch. Can be used for remove watch from later.
+     */
+    public long add(Key key, WatchListener lsnr) {
+        var watch = new Watch(new KeyCriterion.ExactCriterion(key), lsnr);
+        var id = idCntr.incrementAndGet();
+        watches.put(id, watch);
+        return id;
+    }
+
+    /**
+     * Adds new watch with filter by key prefix.
+     *
+     * @param key Prefix for key.
+     * @param lsnr Listener which will be executed on watch event.
+     * @return id of registered watch. Can be used for remove watch from later.
+     */
+    public long addPrefix(Key key, WatchListener lsnr) {
+        var watch = new Watch(new KeyCriterion.PrefixCriterion(key), lsnr);
+        var id = idCntr.incrementAndGet();
+        watches.put(id, watch);
+        return id;
+    }
+
+    /**
+     * Adds new watch with filter by collection of keys.
+     *
+     * @param keys Collection of keys to listen.
+     * @param lsnr Listener which will be executed on watch event.
+     * @return id of registered watch. Can be used for remove watch from later.
+     */
+    public long add(Collection<Key> keys, WatchListener lsnr) {
+        var watch = new Watch(new KeyCriterion.CollectionCriterion(keys), lsnr);
+        var id = idCntr.incrementAndGet();
+        watches.put(id, watch);
+        return id;
+    }
+
+    /**
+     * Adds new watch with filter by collection of keys.
+     *
+     * @param from Start key of range to listen.
+     * @param to End key of range (exclusively)..
+     * @param lsnr Listener which will be executed on watch event.
+     * @return id of registered watch. Can be used for remove watch from later.
+     */
+    public long add(Key from, Key to, WatchListener lsnr) {
+        var watch = new Watch(new KeyCriterion.RangeCriterion(from, to), lsnr);
+        var id = idCntr.incrementAndGet();
+        watches.put(id, watch);
+        return id;
+    }
+
+    /**
+     * Cancel watch by id.
+     *
+     * @param id of watch to cancel.
+     */
+    public void cancel(long id) {
+        watches.remove(id);
+    }
+
+    /**
+     * Cancel multiple watches by ids.
+     *
+     * @param ids of watches to cancel.
+     */
+    public void cancelAll(Collection<Long> ids) {
+        watches.keySet().removeAll(ids);
+    }
+
+    /**
+     * Produce watch with aggregated key criterion and general watch listener dispatcher.
+     *
+     * @param revision start revision to listen event.
+     * @param saveRevisionAct action to commit keys-revision pair to persistent store for processed keys.
+     * @return result aggregated watch.
+     */
+    public Optional<AggregatedWatch> watch(long revision, BiConsumer<Collection<IgniteBiTuple<Key, byte[]>>, Long> saveRevisionAct) {
+        synchronized (watches) {
+            if (watches.isEmpty())
+                return Optional.empty();
+            else
+                return Optional.of(new AggregatedWatch(inferGeneralCriteria(), revision, watchListener(saveRevisionAct)));
+        }
+    }
+
+    /**
+     * Returns general criterion, which overlays all aggregated criteria.
+     *
+     * @return aggregated criterion.
+     */
+    // TODO: IGNITE-14667 We can do it better than infer range always
+    private KeyCriterion inferGeneralCriteria() {
+        return new KeyCriterion.RangeCriterion(
+            watches.values().stream().map(w -> w.keyCriterion().toRange().getKey()).min(Key::compareTo).get(),
+            watches.values().stream().map(w -> w.keyCriterion().toRange().getValue()).max(Key::compareTo).get()
+        );
+    }
+
+    /**
+     * Produces the watch listener, which will dispatch events to appropriate watches.
+     *
+     * @param storeRevision action to commit keys-revision pair to persistent store for processed keys.
+     * @return watch listener, which will dispatch events to appropriate watches.
+     */
+    private WatchListener watchListener(BiConsumer<Collection<IgniteBiTuple<Key, byte[]>>, Long> storeRevision) {
+        // Copy watches to separate collection, because all changes on the WatchAggregator watches
+        // shouldn't be propagated to listener watches immediately.
+        // WatchAggregator will be redeployed with new watches if needed instead.
+        final LinkedHashMap<Long, Watch> cpWatches = new LinkedHashMap<>(watches);
+
+        return new WatchListener() {
+
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> evts) {
+                var watchIt = cpWatches.entrySet().iterator();
+                Collection<Long> toCancel = new ArrayList<>();
+
+                while (watchIt.hasNext()) {
+                    Map.Entry<Long, WatchAggregator.Watch> entry = watchIt.next();
+                    WatchAggregator.Watch watch = entry.getValue();
+                    var filteredEvts = new ArrayList<WatchEvent>();
+
+                    for (WatchEvent evt : evts) {
+                        if (watch.keyCriterion().contains(evt.oldEntry().key()))
+                            filteredEvts.add(evt);
+                    }
+
+                    if (!filteredEvts.isEmpty()) {
+                        if (!watch.listener().onUpdate(filteredEvts)) {
+                            watchIt.remove();
+
+                            toCancel.add(entry.getKey());
+                        }
+                    }
+                }
+
+                // Cancel finished watches from the global watch map
+                // to prevent finished watches from redeploy.
+                if (!toCancel.isEmpty())
+                    cancelAll(toCancel);
+
+                var revision = 0L;
+                var entries = new ArrayList<IgniteBiTuple<Key, byte[]>>();
+                for (WatchEvent evt: evts) {
+                    revision = evt.newEntry().revision();
+
+                    entries.add(new IgniteBiTuple<>(evt.newEntry().key(), evt.newEntry().value()));
+                }
+
+                storeRevision.accept(entries, revision);
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                watches.values().forEach(w -> w.listener().onError(e));
+            }
+        };
+    }
+
+    /**
+     * (key criterion, watch listener) container.
+     */
+    private static class Watch {
+        /** Key criterion. */
+        private final KeyCriterion keyCriterion;
+
+        /** Watch listener. */
+        private final WatchListener lsnr;
+
+        /** Creates the watch. */
+        private Watch(KeyCriterion keyCriterion, WatchListener lsnr) {
+            this.keyCriterion = keyCriterion;
+            this.lsnr = lsnr;
+        }
+
+        /**
+         * @return key criterion.
+         */
+        public KeyCriterion keyCriterion() {
+            return keyCriterion;
+        }
+
+        /**
+         * @return watch listener.
+         */
+        public WatchListener listener() {
+            return lsnr;
+        }
+    }
+}
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
new file mode 100644
index 0000000..4eb1901
--- /dev/null
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class WatchAggregatorTest {
+
+    @Test
+    public void testEventsRouting() {
+        var watchAggregator = new WatchAggregator();
+        var lsnr1 = mock(WatchListener.class);
+        var lsnr2 = mock(WatchListener.class);
+        watchAggregator.add(new Key("1"), lsnr1);
+        watchAggregator.add(new Key("2"), lsnr2);
+
+        var watchEvent1 = new WatchEvent(
+            entry("1", "value1", 1),
+            entry("1", "value1n", 1));
+        var watchEvent2 = new WatchEvent(
+            entry("2", "value2", 1),
+            entry("2", "value2n", 1));
+        watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+        verify(lsnr1).onUpdate(Collections.singletonList(watchEvent1));
+        verify(lsnr2).onUpdate(Collections.singletonList(watchEvent2));
+    }
+
+    @Test
+    public void testCancel() {
+        var watchAggregator = new WatchAggregator();
+        var lsnr1 = mock(WatchListener.class);
+        when(lsnr1.onUpdate(any())).thenReturn(true);
+        var lsnr2 = mock(WatchListener.class);
+        when(lsnr2.onUpdate(any())).thenReturn(true);
+        var id1 = watchAggregator.add(new Key("1"), lsnr1);
+        var id2 = watchAggregator.add(new Key("2"), lsnr2);
+
+        var watchEvent1 = new WatchEvent(
+            entry("1", "value1", 1),
+            entry("1", "value1n", 1));
+        var watchEvent2 = new WatchEvent(
+            entry("2", "value2", 1),
+            entry("2", "value2n", 1));
+        watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+        verify(lsnr1, times(1)).onUpdate(any());
+        verify(lsnr2, times(1)).onUpdate(any());
+
+        watchAggregator.cancel(id1);
+        watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+        verify(lsnr1, times(1)).onUpdate(any());
+        verify(lsnr2, times(2)).onUpdate(any());
+    }
+
+    @Test
+    public void testCancelByFalseFromListener() {
+        var watchAggregator = new WatchAggregator();
+        var lsnr1 = mock(WatchListener.class);
+        when(lsnr1.onUpdate(any())).thenReturn(false);
+        var lsnr2 = mock(WatchListener.class);
+        when(lsnr2.onUpdate(any())).thenReturn(true);
+        var id1 = watchAggregator.add(new Key("1"), lsnr1);
+        var id2 = watchAggregator.add(new Key("2"), lsnr2);
+
+        var watchEvent1 = new WatchEvent(
+            entry("1", "value1", 1),
+            entry("1", "value1n", 1));
+        var watchEvent2 = new WatchEvent(
+            entry("2", "value2", 1),
+            entry("2", "value2n", 1));
+        watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+        verify(lsnr1, times(1)).onUpdate(any());
+        verify(lsnr2, times(1)).onUpdate(any());
+
+        watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+        verify(lsnr1, times(1)).onUpdate(any());
+        verify(lsnr2, times(2)).onUpdate(any());
+
+    }
+
+    private Entry entry(String key, String value, long revision) {
+        return new Entry() {
+            @Override public @NotNull Key key() {
+                return new Key(key);
+            }
+
+            @Override public @Nullable byte[] value() {
+                return value.getBytes(StandardCharsets.UTF_8);
+            }
+
+            @Override public long revision() {
+                return revision;
+            }
+        };
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 2a8ef19..6c6b0f2 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -20,8 +20,12 @@ package org.apache.ignite.internal.app;
 import io.netty.util.internal.StringUtil;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.app.Ignition;
@@ -45,12 +49,22 @@ import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.metastorage.client.MetaStorageService;
+import org.apache.ignite.metastorage.common.Condition;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.WatchListener;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.message.MessageSerializationRegistry;
 import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.utils.IgniteProperties;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Implementation of an entry point for handling grid lifecycle.
@@ -142,7 +156,8 @@ public class IgnitionImpl implements Ignition {
         MetaStorageManager metaStorageMgr = new MetaStorageManager(
             vaultMgr,
             clusterNetSvc,
-            raftMgr
+            raftMgr,
+            metaStorageServiceMock()
         );
 
         // TODO IGNITE-14578 Bootstrap configuration manager with distributed configuration.
@@ -193,4 +208,99 @@ public class IgnitionImpl implements Ignition {
 
         LOG.info(banner + '\n' + " ".repeat(22) + "Apache Ignite ver. " + ver + '\n');
     }
+
+    // TODO: remove when metastorage service will be ready.
+    private static MetaStorageService metaStorageServiceMock() {
+        return new MetaStorageService() {
+            @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override
+            public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition,
+                @NotNull Operation success, @NotNull Operation failure) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
+                @NotNull Operation success, @NotNull Operation failure) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<IgniteUuid> watch(@Nullable Key keyFrom, @Nullable Key keyTo,
+                long revision, @NotNull WatchListener lsnr) {
+                return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+            }
+
+            @Override public @NotNull CompletableFuture<IgniteUuid> watch(@NotNull Key key, long revision,
+                @NotNull WatchListener lsnr) {
+                return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+            }
+
+            @Override public @NotNull CompletableFuture<IgniteUuid> watch(@NotNull Collection<Key> keys, long revision,
+                @NotNull WatchListener lsnr) {
+                return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+            }
+
+            @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Void> compact() {
+                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            }
+        };
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 802d33b..86a04e4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -118,9 +118,9 @@ public class TableManager implements IgniteTables {
         if (hasMetastorageLocally(localNodeName, metastorageMembers))
             subscribeForTableCreation();
 
-        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
 
-        tableCreationSubscriptionFut = metaStorageMgr.registerWatch(new Key(tableInternalPrefix), new WatchListener() {
+        tableCreationSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
             @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
                 for (WatchEvent evt : events) {
                     if (!ArrayUtils.empty(evt.newEntry().value())) {