You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/28 21:07:44 UTC
[ignite-3] branch main updated: IGNITE-14446 Added support of watch
and put/get/scan operations to MetaStorageManager. Fixes #111
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 38e7683 IGNITE-14446 Added support of watch and put/get/scan operations to MetaStorageManager. Fixes #111
38e7683 is described below
commit 38e7683004712eb9cd233ef556a7f5bb57acf56f
Author: Kirill Gusakov <kg...@gmail.com>
AuthorDate: Thu Apr 29 00:07:12 2021 +0300
IGNITE-14446 Added support of watch and put/get/scan operations to MetaStorageManager. Fixes #111
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../ignite/internal/affinity/AffinityManager.java | 4 +-
modules/metastorage/pom.xml | 23 ++
.../internal/metastorage/MetaStorageManager.java | 336 ++++++++++++++++++---
.../metastorage/watch/AggregatedWatch.java | 69 +++++
.../internal/metastorage/watch/KeyCriterion.java | 162 ++++++++++
.../metastorage/watch/WatchAggregator.java | 244 +++++++++++++++
.../internal/metastorage/WatchAggregatorTest.java | 131 ++++++++
.../apache/ignite/internal/app/IgnitionImpl.java | 112 ++++++-
.../internal/table/distributed/TableManager.java | 4 +-
9 files changed, 1041 insertions(+), 44 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index d09094e..937af33 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -129,9 +129,9 @@ public class AffinityManager {
private void subscribeToAssignmentCalculation() {
assert affinityCalculateSubscriptionFut == null : "Affinity calculation already subscribed";
- String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+ String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
- affinityCalculateSubscriptionFut = metaStorageMgr.registerWatch(new Key(tableInternalPrefix), new WatchListener() {
+ affinityCalculateSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
@Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
for (WatchEvent evt : events) {
if (ArrayUtils.empty(evt.newEntry().value())) {
diff --git a/modules/metastorage/pom.xml b/modules/metastorage/pom.xml
index b31a74c..6c3a3f8 100644
--- a/modules/metastorage/pom.xml
+++ b/modules/metastorage/pom.xml
@@ -57,5 +57,28 @@
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-api</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-vault</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index d56e51a..4e06338 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -17,11 +17,22 @@
package org.apache.ignite.internal.metastorage;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.metastorage.client.MetaStorageService;
import org.apache.ignite.metastorage.common.Condition;
+import org.apache.ignite.metastorage.common.Cursor;
import org.apache.ignite.metastorage.common.Entry;
import org.apache.ignite.metastorage.common.Key;
import org.apache.ignite.metastorage.common.Operation;
@@ -53,6 +64,28 @@ import org.jetbrains.annotations.Nullable;
private MetaStorageService metaStorageSvc;
/**
+ * Aggregator of multiple watches to deploy them as one batch.
+ *
+ * @see WatchAggregator
+ */
+ private final WatchAggregator watchAggregator;
+
+ /**
+ * Future which will be completed with {@link IgniteUuid},
+ * when aggregated watch will be successfully deployed.
+ * Can be resolved to {@link Optional#empty()} if no watch deployed at the moment.
+ */
+ private CompletableFuture<Optional<IgniteUuid>> deployFut;
+
+ /**
+ * If true - all new watches will be deployed immediately.
+ *
+ * If false - all new watches will be aggregated to one batch
+ * for further deploy by {@link MetaStorageManager#deployWatches()}
+ */
+ private boolean deployed;
+
+ /**
* The constructor.
*
* @param vaultMgr Vault manager.
@@ -62,11 +95,15 @@ import org.jetbrains.annotations.Nullable;
public MetaStorageManager(
VaultManager vaultMgr,
ClusterService clusterNetSvc,
- Loza raftMgr
+ Loza raftMgr,
+ MetaStorageService metaStorageSvc
) {
this.vaultMgr = vaultMgr;
this.clusterNetSvc = clusterNetSvc;
this.raftMgr = raftMgr;
+ this.metaStorageSvc = metaStorageSvc;
+ watchAggregator = new WatchAggregator();
+ deployFut = new CompletableFuture<>();
// TODO: IGNITE-14088: Uncomment and use real serializer factory
// Arrays.stream(MetaStorageMessageTypes.values()).forEach(
@@ -82,75 +119,296 @@ import org.jetbrains.annotations.Nullable;
}
/**
- * Register subscription on meta storage updates for further deployment when DMS is ready.
+ * Deploy all registered watches.
+ */
+ public synchronized void deployWatches() {
+ try {
+ var watch = watchAggregator.watch(
+ vaultMgr.appliedRevision(),
+ this::storeEntries
+ );
+ if (watch.isEmpty())
+ deployFut.complete(Optional.empty());
+ else
+ metaStorageSvc.watch(
+ watch.get().keyCriterion().toRange().getKey(),
+ watch.get().keyCriterion().toRange().getValue(),
+ watch.get().revision(),
+ watch.get().listener()).thenAccept(id -> deployFut.complete(Optional.of(id))).join();
+ }
+ catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalException("Couldn't receive applied revision during deploy watches", e);
+ }
+
+ deployed = true;
+ }
+
+ /**
+ * Register watch listener by key.
*
- * @param key The target key. Couldn't be {@code null}.
+ * @param key The target key.
* @param lsnr Listener which will be notified for each update.
* @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
- * subscription.
+ * subscription
*/
- public synchronized CompletableFuture<Long> registerWatch(@Nullable Key key, @NotNull WatchListener lsnr) {
- // TODO: IGNITE-14446 Implement DMS manager with watch registry.
- return null;
+ public synchronized CompletableFuture<Long> registerWatch(
+ @Nullable Key key,
+ @NotNull WatchListener lsnr
+ ) {
+ return waitForReDeploy(watchAggregator.add(key, lsnr));
}
/**
- * Proxies the invocation to metastorage.
+ * Register watch listener by key prefix.
*
- * @param key The target key.
- * @return Metastorage entry.
+ * @param key Prefix to listen.
+ * @param lsnr Listener which will be notified for each update.
+ * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
+ * subscription
*/
- public synchronized CompletableFuture<Entry> get(@Nullable Key key) {
- // TODO: IGNITE-14446 Implement DMS manager with watch registry.
- return null;
+ public synchronized CompletableFuture<Long> registerWatchByPrefix(
+ @Nullable Key key,
+ @NotNull WatchListener lsnr
+ ) {
+ return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
}
/**
- * Proxies the invocation to metastorage.
+ * Register watch listener by collection of keys.
*
- * @param key The target key.
- * @param value The value to set.
- * @return
+ * @param keys Collection listen.
+ * @param lsnr Listener which will be notified for each update.
+ * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
+ * subscription
*/
- public CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
- // TODO: IGNITE-14446 Implement DMS manager with watch registry.
- return null;
+ public synchronized CompletableFuture<Long> registerWatch(
+ @NotNull Collection<Key> keys,
+ @NotNull WatchListener lsnr
+ ) {
+ return waitForReDeploy(watchAggregator.add(keys, lsnr));
}
/**
- * Invokes a service operation for metastorage.
+ * Register watch listener by range of keys.
*
- * @param key Key in metastorage.
- * @param condition Condition to process.
- * @param success Success operation.
- * @param failure Failure operation.
- * @return Future which will complete when appropriate final operation would be invoked.
+ * @param from Start key of range.
+ * @param to End key of range (exclusively).
+ * @param lsnr Listener which will be notified for each update.
+ * @return future with id of registered watch.
+ */
+ public synchronized CompletableFuture<Long> registerWatch(
+ @NotNull Key from,
+ @NotNull Key to,
+ @NotNull WatchListener lsnr
+ ) {
+ return waitForReDeploy(watchAggregator.add(from, to, lsnr));
+ }
+
+ /**
+ * Unregister watch listener by id.
+ *
+ * @param id of watch to unregister.
+ * @return future, which will be completed when unregister finished.
+ */
+ public synchronized CompletableFuture<Void> unregisterWatch(long id) {
+ watchAggregator.cancel(id);
+ if (deployed)
+ return updateWatches().thenAccept(v -> {});
+ else
+ return deployFut.thenAccept(uuid -> {});
+ }
+
+ /**
+ * @see MetaStorageService#get(Key)
+ */
+ public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
+ return metaStorageSvc.get(key);
+ }
+
+ /**
+ * @see MetaStorageService#get(Key, long)
+ */
+ public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
+ return metaStorageSvc.get(key, revUpperBound);
+ }
+
+ /**
+ * @see MetaStorageService#getAll(Collection)
+ */
+ public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
+ return metaStorageSvc.getAll(keys);
+ }
+
+ /**
+ * @see MetaStorageService#getAll(Collection, long)
+ */
+ public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
+ return metaStorageSvc.getAll(keys, revUpperBound);
+ }
+
+ /**
+ * @see MetaStorageService#put(Key, byte[])
+ */
+ public @NotNull CompletableFuture<Void> put(@NotNull Key key, byte[] val) {
+ return metaStorageSvc.put(key, val);
+ }
+
+ /**
+ * @see MetaStorageService#getAndPut(Key, byte[])
*/
- public CompletableFuture<Boolean> invoke(
+ public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, byte[] val) {
+ return metaStorageSvc.getAndPut(key, val);
+ }
+
+ /**
+ * @see MetaStorageService#putAll(Map)
+ */
+ public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
+ return metaStorageSvc.putAll(vals);
+ }
+
+ /**
+ * @see MetaStorageService#getAndPutAll(Map)
+ */
+ public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
+ return metaStorageSvc.getAndPutAll(vals);
+ }
+
+ /**
+ * @see MetaStorageService#remove(Key)
+ */
+ public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
+ return metaStorageSvc.remove(key);
+ }
+
+ /**
+ * @see MetaStorageService#getAndRemove(Key)
+ */
+ public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
+ return metaStorageSvc.getAndRemove(key);
+ }
+
+ /**
+ * @see MetaStorageService#removeAll(Collection)
+ */
+ public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
+ return metaStorageSvc.removeAll(keys);
+ }
+
+ /**
+ * @see MetaStorageService#getAndRemoveAll(Collection)
+ */
+ public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
+ return metaStorageSvc.getAndRemoveAll(keys);
+ }
+
+ /**
+ * @see MetaStorageService#invoke(Key, Condition, Operation, Operation)
+ */
+ public @NotNull CompletableFuture<Boolean> invoke(
@NotNull Key key,
- @NotNull Condition condition,
+ @NotNull Condition cond,
@NotNull Operation success,
@NotNull Operation failure
) {
- // TODO: IGNITE-14446 Implement DMS manager with watch registry.
- return null;
+ return metaStorageSvc.invoke(key, cond, success, failure);
}
/**
- * Unregister subscription for the given identifier.
+ * @see MetaStorageService#getAndInvoke(Key, Condition, Operation, Operation)
+ */
+ public @NotNull CompletableFuture<Entry> getAndInvoke(
+ @NotNull Key key,
+ @NotNull Condition cond,
+ @NotNull Operation success,
+ @NotNull Operation failure
+ ) {
+ return metaStorageSvc.getAndInvoke(key, cond, success, failure);
+ }
+
+ /**
+ * @see MetaStorageService#range(Key, Key, long)
+ */
+ public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
+ return metaStorageSvc.range(keyFrom, keyTo, revUpperBound);
+ }
+
+ /**
+ * @see MetaStorageService#range(Key, Key)
+ */
+ public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
+ return metaStorageSvc.range(keyFrom, keyTo);
+ }
+
+ /**
+ * @see MetaStorageService#compact()
+ */
+ public @NotNull CompletableFuture<Void> compact() {
+ return metaStorageSvc.compact();
+ }
+
+ /**
+ * Stop current batch of consolidated watches and register new one from current {@link WatchAggregator}.
*
- * @param id Subscription identifier.
- * @return Completed future in case of operation success. Couldn't be {@code null}.
+ * @return Ignite UUID of new consolidated watch.
*/
- public synchronized CompletableFuture<Void> unregisterWatch(long id) {
- // TODO: IGNITE-14446 Implement DMS manager with watch registry.
- return null;
+ private CompletableFuture<Optional<IgniteUuid>> updateWatches() {
+ Long revision;
+ try {
+ revision = vaultMgr.appliedRevision();
+ }
+ catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalException("Couldn't receive applied revision during watch redeploy", e);
+ }
+
+ final var finalRevision = revision;
+
+ deployFut = deployFut
+ .thenCompose(idOpt -> idOpt.map(metaStorageSvc::stopWatch).orElse(CompletableFuture.completedFuture(null)))
+ .thenCompose(r -> {
+ var watch = watchAggregator.watch(finalRevision, this::storeEntries);
+
+ if (watch.isEmpty())
+ return CompletableFuture.completedFuture(Optional.empty());
+ else
+ return metaStorageSvc.watch(
+ watch.get().keyCriterion().toRange().get1(),
+ watch.get().keyCriterion().toRange().get2(),
+ watch.get().revision(),
+ watch.get().listener()).thenApply(Optional::of);
+ });
+
+ return deployFut;
}
/**
- * Deploy all registered watches through{@code MetaStorageService.watch()}.
+ * Store entries with appropriate associated revision.
+ *
+ * @param entries to store.
+ * @param revision associated revision.
+ * @return future, which will be completed when store action finished.
*/
- public synchronized void deployWatches() {
- // TODO: IGNITE-14446 Implement DMS manager with watch registry.
+ private CompletableFuture<Void> storeEntries(Collection<IgniteBiTuple<Key, byte[]>> entries, long revision) {
+ try {
+ return vaultMgr.putAll(entries.stream().collect(
+ Collectors.toMap(
+ e -> ByteArray.fromString(e.getKey().toString()),
+ IgniteBiTuple::getValue)),
+ revision);
+ }
+ catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalException("Couldn't put entries with considered revision.", e);
+ }
+ }
+
+ /**
+ * @param id of watch to redeploy.
+ * @return future, which will be completed after redeploy finished.
+ */
+ private CompletableFuture<Long> waitForReDeploy(long id) {
+ if (deployed)
+ return updateWatches().thenApply(uid -> id);
+ else
+ return deployFut.thenApply(uid -> id);
}
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/AggregatedWatch.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/AggregatedWatch.java
new file mode 100644
index 0000000..754bf76
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/AggregatedWatch.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.watch;
+
+import org.apache.ignite.metastorage.common.WatchListener;
+
+/**
+ * Watch implementation with associated revision.
+ * Instance of this watch produced by {@link WatchAggregator}.
+ */
+public class AggregatedWatch {
+ /** Watch key criterion. */
+ private final KeyCriterion keyCriterion;
+
+ /** Aggregated watch listener. */
+ private final WatchListener lsnr;
+
+ /** Watch revision. */
+ private final long revision;
+
+ /**
+ * Creates the instance of aggregated watch.
+ *
+ * @param keyCriterion Aggregated key criterion.
+ * @param revision Aggregated revision.
+ * @param lsnr Aggregated listener.
+ */
+ public AggregatedWatch(KeyCriterion keyCriterion, long revision, WatchListener lsnr) {
+ this.keyCriterion = keyCriterion;
+ this.revision = revision;
+ this.lsnr = lsnr;
+ }
+
+ /**
+ * @return Key criterion.
+ */
+ public KeyCriterion keyCriterion() {
+ return keyCriterion;
+ }
+
+ /**
+ * @return Watch listener.
+ */
+ public WatchListener listener() {
+ return lsnr;
+ }
+
+ /**
+ * @return Revision.
+ */
+ public long revision() {
+ return revision;
+ }
+}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/KeyCriterion.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/KeyCriterion.java
new file mode 100644
index 0000000..654b872
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/KeyCriterion.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.watch;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.metastorage.common.Key;
+
+/**
+ * Filter for listen key's changes on metastore.
+ */
+public interface KeyCriterion {
+ /**
+ * Translates any type of key criterion to range of keys.
+ *
+ * @return Ignite tuple with first key as start of range and second as the end.
+ */
+ public IgniteBiTuple<Key, Key> toRange();
+
+ /**
+ * Check if this key criterion contains the key.
+ *
+ * @return true if criterion contains the key, false otherwise.
+ */
+ public boolean contains(Key key);
+
+ /**
+ * Simple criterion which contains exactly one key.
+ */
+ static class ExactCriterion implements KeyCriterion {
+ /** The key of criterion. */
+ private final Key key;
+
+ /**
+ * Creates the instance of exact criterion.
+ *
+ * @param key Instance of the reference key.
+ */
+ public ExactCriterion(Key key) {
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBiTuple<Key, Key> toRange() {
+ return new IgniteBiTuple<>(key, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Key key) {
+ return this.key.equals(key);
+ }
+
+ }
+
+ /**
+ * Criterion which contains the range of keys.
+ */
+ static class RangeCriterion implements KeyCriterion {
+ /** Start of the range. */
+ private final Key from;
+
+ /** End of the range (exclusive). */
+ private final Key to;
+
+ /**
+ * Creates the instance of range criterion.
+ *
+ * @param from Start of the range.
+ * @param to End of the range (exclusive).
+ */
+ public RangeCriterion(Key from, Key to) {
+ this.from = from;
+ this.to = to;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBiTuple<Key, Key> toRange() {
+ return new IgniteBiTuple<>(from, to);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Key key) {
+ return key.compareTo(from) >= 0 && key.compareTo(to) < 0;
+ }
+ }
+
+ /**
+ * Criterion which consists collection of keys.
+ */
+ static class CollectionCriterion implements KeyCriterion {
+ /** Collection of keys. */
+ private final Collection<Key> keys;
+
+ /**
+ * Creates the instance of collection criterion.
+ *
+ * @param keys Collection of keys.
+ */
+ public CollectionCriterion(Collection<Key> keys) {
+ this.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBiTuple<Key, Key> toRange() {
+ return new IgniteBiTuple<>(Collections.min(keys), Collections.max(keys));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Key key) {
+ return keys.contains(key);
+ }
+ }
+
+ /**
+ * Criterion which consists of all keys with defined prefix.
+ */
+ static class PrefixCriterion implements KeyCriterion {
+ /** Prefix of the key. */
+ private final Key prefixKey;
+
+ /**
+ * Creates the instance of prefix key criterion.
+ *
+ * @param prefixKey Prefix of the key.
+ */
+ public PrefixCriterion(Key prefixKey) {
+ this.prefixKey = prefixKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBiTuple<Key, Key> toRange() {
+ var bytes = Arrays.copyOf(prefixKey.bytes(), prefixKey.bytes().length);
+ if (bytes[bytes.length - 1] != Byte.MAX_VALUE)
+ bytes[bytes.length - 1]++;
+ else
+ bytes = Arrays.copyOf(bytes, bytes.length + 1);
+ return new IgniteBiTuple<>(prefixKey, new Key(bytes));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Key key) {
+ return key.compareTo(prefixKey) >= 0 && key.compareTo(toRange().getValue()) < 0;
+ }
+ }
+}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
new file mode 100644
index 0000000..35c8881
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.watch;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Needed to aggregate multiple watches to one aggregated watch.
+ * This approach needed to provide the following additional guarantees to watching mechanism:
+ * - watch events will be processed sequentially
+ * - watch events will be resolved in the order of watch registration
+ */
+public class WatchAggregator {
+ /**
+ * Watches' map must be synchronized because of changes from WatchListener in separate thread.
+ */
+ private final Map<Long, Watch> watches = Collections.synchronizedMap(new LinkedHashMap<>());
+
+ /** Simple auto increment id for internal watches. */
+ private AtomicLong idCntr = new AtomicLong(0);
+
+ /**
+ * Adds new watch with simple exact criterion.
+ *
+ * @param key Key for watching.
+ * @param lsnr Listener which will be executed on watch event.
+ * @return id of registered watch. Can be used for remove watch from later.
+ */
+ public long add(Key key, WatchListener lsnr) {
+ var watch = new Watch(new KeyCriterion.ExactCriterion(key), lsnr);
+ var id = idCntr.incrementAndGet();
+ watches.put(id, watch);
+ return id;
+ }
+
+ /**
+ * Adds new watch with filter by key prefix.
+ *
+ * @param key Prefix for key.
+ * @param lsnr Listener which will be executed on watch event.
+ * @return id of registered watch. Can be used for remove watch from later.
+ */
+ public long addPrefix(Key key, WatchListener lsnr) {
+ var watch = new Watch(new KeyCriterion.PrefixCriterion(key), lsnr);
+ var id = idCntr.incrementAndGet();
+ watches.put(id, watch);
+ return id;
+ }
+
+ /**
+ * Adds new watch with filter by collection of keys.
+ *
+ * @param keys Collection of keys to listen.
+ * @param lsnr Listener which will be executed on watch event.
+ * @return id of registered watch. Can be used for remove watch from later.
+ */
+ public long add(Collection<Key> keys, WatchListener lsnr) {
+ var watch = new Watch(new KeyCriterion.CollectionCriterion(keys), lsnr);
+ var id = idCntr.incrementAndGet();
+ watches.put(id, watch);
+ return id;
+ }
+
+ /**
+ * Adds new watch with filter by collection of keys.
+ *
+ * @param from Start key of range to listen.
+ * @param to End key of range (exclusively)..
+ * @param lsnr Listener which will be executed on watch event.
+ * @return id of registered watch. Can be used for remove watch from later.
+ */
+ public long add(Key from, Key to, WatchListener lsnr) {
+ var watch = new Watch(new KeyCriterion.RangeCriterion(from, to), lsnr);
+ var id = idCntr.incrementAndGet();
+ watches.put(id, watch);
+ return id;
+ }
+
+ /**
+ * Cancel watch by id.
+ *
+ * @param id of watch to cancel.
+ */
+ public void cancel(long id) {
+ watches.remove(id);
+ }
+
+ /**
+ * Cancel multiple watches by ids.
+ *
+ * @param ids of watches to cancel.
+ */
+ public void cancelAll(Collection<Long> ids) {
+ watches.keySet().removeAll(ids);
+ }
+
+ /**
+ * Produce watch with aggregated key criterion and general watch listener dispatcher.
+ *
+ * @param revision start revision to listen event.
+ * @param saveRevisionAct action to commit keys-revision pair to persistent store for processed keys.
+ * @return result aggregated watch.
+ */
+ public Optional<AggregatedWatch> watch(long revision, BiConsumer<Collection<IgniteBiTuple<Key, byte[]>>, Long> saveRevisionAct) {
+ synchronized (watches) {
+ if (watches.isEmpty())
+ return Optional.empty();
+ else
+ return Optional.of(new AggregatedWatch(inferGeneralCriteria(), revision, watchListener(saveRevisionAct)));
+ }
+ }
+
+ /**
+ * Returns general criterion, which overlays all aggregated criteria.
+ *
+ * @return aggregated criterion.
+ */
+ // TODO: IGNITE-14667 We can do it better than infer range always
+ private KeyCriterion inferGeneralCriteria() {
+ return new KeyCriterion.RangeCriterion(
+ watches.values().stream().map(w -> w.keyCriterion().toRange().getKey()).min(Key::compareTo).get(),
+ watches.values().stream().map(w -> w.keyCriterion().toRange().getValue()).max(Key::compareTo).get()
+ );
+ }
+
+ /**
+ * Produces the watch listener, which will dispatch events to appropriate watches.
+ *
+ * @param storeRevision action to commit keys-revision pair to persistent store for processed keys.
+ * @return watch listener, which will dispatch events to appropriate watches.
+ */
+ private WatchListener watchListener(BiConsumer<Collection<IgniteBiTuple<Key, byte[]>>, Long> storeRevision) {
+ // Copy watches to separate collection, because all changes on the WatchAggregator watches
+ // shouldn't be propagated to listener watches immediately.
+ // WatchAggregator will be redeployed with new watches if needed instead.
+ final LinkedHashMap<Long, Watch> cpWatches = new LinkedHashMap<>(watches);
+
+ return new WatchListener() {
+
+ @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> evts) {
+ var watchIt = cpWatches.entrySet().iterator();
+ Collection<Long> toCancel = new ArrayList<>();
+
+ while (watchIt.hasNext()) {
+ Map.Entry<Long, WatchAggregator.Watch> entry = watchIt.next();
+ WatchAggregator.Watch watch = entry.getValue();
+ var filteredEvts = new ArrayList<WatchEvent>();
+
+ for (WatchEvent evt : evts) {
+ if (watch.keyCriterion().contains(evt.oldEntry().key()))
+ filteredEvts.add(evt);
+ }
+
+ if (!filteredEvts.isEmpty()) {
+ if (!watch.listener().onUpdate(filteredEvts)) {
+ watchIt.remove();
+
+ toCancel.add(entry.getKey());
+ }
+ }
+ }
+
+ // Cancel finished watches from the global watch map
+ // to prevent finished watches from redeploy.
+ if (!toCancel.isEmpty())
+ cancelAll(toCancel);
+
+ var revision = 0L;
+ var entries = new ArrayList<IgniteBiTuple<Key, byte[]>>();
+ for (WatchEvent evt: evts) {
+ revision = evt.newEntry().revision();
+
+ entries.add(new IgniteBiTuple<>(evt.newEntry().key(), evt.newEntry().value()));
+ }
+
+ storeRevision.accept(entries, revision);
+
+ return true;
+ }
+
+ @Override public void onError(@NotNull Throwable e) {
+ watches.values().forEach(w -> w.listener().onError(e));
+ }
+ };
+ }
+
+ /**
+ * (key criterion, watch listener) container.
+ */
+ private static class Watch {
+ /** Key criterion. */
+ private final KeyCriterion keyCriterion;
+
+ /** Watch listener. */
+ private final WatchListener lsnr;
+
+ /** Creates the watch. */
+ private Watch(KeyCriterion keyCriterion, WatchListener lsnr) {
+ this.keyCriterion = keyCriterion;
+ this.lsnr = lsnr;
+ }
+
+ /**
+ * @return key criterion.
+ */
+ public KeyCriterion keyCriterion() {
+ return keyCriterion;
+ }
+
+ /**
+ * @return watch listener.
+ */
+ public WatchListener listener() {
+ return lsnr;
+ }
+ }
+}
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
new file mode 100644
index 0000000..4eb1901
--- /dev/null
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class WatchAggregatorTest {
+
+ @Test
+ public void testEventsRouting() {
+ var watchAggregator = new WatchAggregator();
+ var lsnr1 = mock(WatchListener.class);
+ var lsnr2 = mock(WatchListener.class);
+ watchAggregator.add(new Key("1"), lsnr1);
+ watchAggregator.add(new Key("2"), lsnr2);
+
+ var watchEvent1 = new WatchEvent(
+ entry("1", "value1", 1),
+ entry("1", "value1n", 1));
+ var watchEvent2 = new WatchEvent(
+ entry("2", "value2", 1),
+ entry("2", "value2n", 1));
+ watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+ verify(lsnr1).onUpdate(Collections.singletonList(watchEvent1));
+ verify(lsnr2).onUpdate(Collections.singletonList(watchEvent2));
+ }
+
+ @Test
+ public void testCancel() {
+ var watchAggregator = new WatchAggregator();
+ var lsnr1 = mock(WatchListener.class);
+ when(lsnr1.onUpdate(any())).thenReturn(true);
+ var lsnr2 = mock(WatchListener.class);
+ when(lsnr2.onUpdate(any())).thenReturn(true);
+ var id1 = watchAggregator.add(new Key("1"), lsnr1);
+ var id2 = watchAggregator.add(new Key("2"), lsnr2);
+
+ var watchEvent1 = new WatchEvent(
+ entry("1", "value1", 1),
+ entry("1", "value1n", 1));
+ var watchEvent2 = new WatchEvent(
+ entry("2", "value2", 1),
+ entry("2", "value2n", 1));
+ watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+ verify(lsnr1, times(1)).onUpdate(any());
+ verify(lsnr2, times(1)).onUpdate(any());
+
+ watchAggregator.cancel(id1);
+ watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+ verify(lsnr1, times(1)).onUpdate(any());
+ verify(lsnr2, times(2)).onUpdate(any());
+ }
+
+ @Test
+ public void testCancelByFalseFromListener() {
+ var watchAggregator = new WatchAggregator();
+ var lsnr1 = mock(WatchListener.class);
+ when(lsnr1.onUpdate(any())).thenReturn(false);
+ var lsnr2 = mock(WatchListener.class);
+ when(lsnr2.onUpdate(any())).thenReturn(true);
+ var id1 = watchAggregator.add(new Key("1"), lsnr1);
+ var id2 = watchAggregator.add(new Key("2"), lsnr2);
+
+ var watchEvent1 = new WatchEvent(
+ entry("1", "value1", 1),
+ entry("1", "value1n", 1));
+ var watchEvent2 = new WatchEvent(
+ entry("2", "value2", 1),
+ entry("2", "value2n", 1));
+ watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+ verify(lsnr1, times(1)).onUpdate(any());
+ verify(lsnr2, times(1)).onUpdate(any());
+
+ watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
+
+ verify(lsnr1, times(1)).onUpdate(any());
+ verify(lsnr2, times(2)).onUpdate(any());
+
+ }
+
+ private Entry entry(String key, String value, long revision) {
+ return new Entry() {
+ @Override public @NotNull Key key() {
+ return new Key(key);
+ }
+
+ @Override public @Nullable byte[] value() {
+ return value.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override public long revision() {
+ return revision;
+ }
+ };
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 2a8ef19..6c6b0f2 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -20,8 +20,12 @@ package org.apache.ignite.internal.app;
import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
@@ -45,12 +49,22 @@ import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.metastorage.client.MetaStorageService;
+import org.apache.ignite.metastorage.common.Condition;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.WatchListener;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.utils.IgniteProperties;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Implementation of an entry point for handling grid lifecycle.
@@ -142,7 +156,8 @@ public class IgnitionImpl implements Ignition {
MetaStorageManager metaStorageMgr = new MetaStorageManager(
vaultMgr,
clusterNetSvc,
- raftMgr
+ raftMgr,
+ metaStorageServiceMock()
);
// TODO IGNITE-14578 Bootstrap configuration manager with distributed configuration.
@@ -193,4 +208,99 @@ public class IgnitionImpl implements Ignition {
LOG.info(banner + '\n' + " ".repeat(22) + "Apache Ignite ver. " + ver + '\n');
}
+
+ // TODO: remove when metastorage service will be ready.
+ private static MetaStorageService metaStorageServiceMock() {
+ return new MetaStorageService() {
+ @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override
+ public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition,
+ @NotNull Operation success, @NotNull Operation failure) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
+ @NotNull Operation success, @NotNull Operation failure) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<IgniteUuid> watch(@Nullable Key keyFrom, @Nullable Key keyTo,
+ long revision, @NotNull WatchListener lsnr) {
+ return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+ }
+
+ @Override public @NotNull CompletableFuture<IgniteUuid> watch(@NotNull Key key, long revision,
+ @NotNull WatchListener lsnr) {
+ return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+ }
+
+ @Override public @NotNull CompletableFuture<IgniteUuid> watch(@NotNull Collection<Key> keys, long revision,
+ @NotNull WatchListener lsnr) {
+ return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+ }
+
+ @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> compact() {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+ };
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 802d33b..86a04e4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -118,9 +118,9 @@ public class TableManager implements IgniteTables {
if (hasMetastorageLocally(localNodeName, metastorageMembers))
subscribeForTableCreation();
- String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+ String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
- tableCreationSubscriptionFut = metaStorageMgr.registerWatch(new Key(tableInternalPrefix), new WatchListener() {
+ tableCreationSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
@Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
for (WatchEvent evt : events) {
if (!ArrayUtils.empty(evt.newEntry().value())) {