You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2023/01/20 09:45:27 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1490: IGNITE-18397 Rework Watches based on Raft Learners

ibessonov commented on code in PR #1490:
URL: https://github.com/apache/ignite-3/pull/1490#discussion_r1080883366


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -741,39 +742,27 @@ private void initDataNodesFromVaultManager() {
         }
 
         try {
-            // TODO: Remove this call as part of https://issues.apache.org/jira/browse/IGNITE-18397
-            vaultMgr.get(MetaStorageManagerImpl.APPLIED_REV)
-                    .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
-                    .thenAccept(vaultAppliedRevision -> {
+            long appliedRevision = metaStorageManager.appliedRevision();
+
+            vaultMgr.get(zonesLogicalTopologyKey())

Review Comment:
   Any idea why Vault is asynchronous?



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for Meta Storage Watches.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class ItMetaStorageWatchTest {
+    private static class Node {
+        private final ClusterService clusterService;
+
+        private final RaftManager raftManager;
+
+        private final MetaStorageManager metaStorageManager;
+
+        private final CompletableFuture<Set<String>> metaStorageNodesFuture = new CompletableFuture<>();
+
+        Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) {
+            this.clusterService = clusterService;
+
+            Path basePath = dataPath.resolve(name());
+
+            this.raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    basePath.resolve("raft"),
+                    new HybridClockImpl()
+            );
+
+            var vaultManager = mock(VaultManager.class);

Review Comment:
   Can we simply have a VaultManager with InMemoryVaultService inside? Or it's less convenient?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when they leave Logical Topology
+                                //  see https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in case we missed some updates.
+                                addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService())
+                        .thenAccept(isLeader -> {
+                            if (isLeader) {
+                                service.closeCursors(member.id());
+                            }
+                        }));
+            }
+        });
+    }
+
+    private void addLearners(Collection<ClusterNode> nodes) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(raftService -> isCurrentNodeLeader(raftService)
+                            .thenCompose(isLeader -> {
+                                if (!isLeader) {
+                                    return CompletableFuture.completedFuture(null);
+                                }
+
+                                if (!busyLock.enterBusy()) {
+                                    LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+                                    return CompletableFuture.completedFuture(null);
+                                }
+
+                                try {
+                                    Set<String> peers = raftService.peers().stream()
+                                            .map(Peer::consistentId)
+                                            .collect(toUnmodifiableSet());
+
+                                    Set<String> learners = nodes.stream()
+                                            .map(ClusterNode::name)
+                                            .filter(name -> !peers.contains(name))
+                                            .collect(toUnmodifiableSet());
+
+                                    LOG.info("New Meta Storage learners detected: " + learners);
+
+                                    if (learners.isEmpty()) {
+                                        return CompletableFuture.completedFuture(null);
+                                    }
+
+                                    PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(peers, learners);
+
+                                    return raftService.addLearners(newConfiguration.learners());
+                                } finally {
+                                    busyLock.leaveBusy();
+                                }
+                            })
+                    )
+                    .whenComplete((v, e) -> {
+                        if (e != null) {
+                            LOG.error("Unable to change peers on topology update", e);
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Boolean> isCurrentNodeLeader(RaftGroupService raftService) {
+        String name = clusterService.topologyService().localMember().name();
+
+        return raftService.refreshLeader()

Review Comment:
   What if this method returned a future with the leader peer? :thinking: 



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when they leave Logical Topology
+                                //  see https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in case we missed some updates.
+                                addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService())

Review Comment:
   I wonder if there's a race possible here if disappeared node appears back too fast



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -842,11 +753,9 @@ public boolean hasNext() {
             }
 
             try {
-                try {
-                    return innerIterFut.thenApply(Iterator::hasNext).get();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new MetaStorageException(CURSOR_EXECUTION_ERR, e);
-                }
+                return innerCursorFut.thenApply(Iterator::hasNext).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new MetaStorageException(CURSOR_EXECUTION_ERR, e);

Review Comment:
   Some catches contain interruptions, other don't. Any idea why?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java:
##########
@@ -220,32 +222,54 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * @param keyFrom Start key of range (inclusive).
      * @param keyTo   Last key of range (exclusive).
      * @param rev     Start revision number.
-     * @return Cursor by update events.
      */
-    Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev);
+    void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, WatchListener listener);
 
     /**
-     * Creates subscription on updates of entries corresponding to the given keys range (where upper bound is unlimited) and starting from
-     * the given revision number.
+     * Registers a watch listener by a key prefix.
      *
-     * @param key Start key of range (inclusive).
-     * @param rev Start revision number.
-     * @return Cursor by update events.
+     * @param prefix Prefix to listen to.
+     * @param rev Starting Meta Storage revision.
+     * @param listener Listener which will be notified for each update.
      */
-    Cursor<WatchEvent> watch(byte[] key, long rev);
+    void watchPrefix(byte[] prefix, long rev, WatchListener listener);
 
     /**
-     * Creates subscription on updates of entries corresponding to the given keys collection and starting from the given revision number.
+     * Registers a watch listener for the provided key.
      *
-     * @param keys Collection of keys
-     * @param rev  Start revision number.
-     * @return Cursor by update events.
+     * @param key Meta Storage key.
+     * @param rev Starting Meta Storage revision.
+     * @param listener Listener which will be notified for each update.
      */
-    Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev);
+    void watchExact(byte[] key, long rev, WatchListener listener);
+
+    /**
+     * Registers a watch listener for the provided keys.
+     *
+     * @param keys Meta Storage keys.
+     * @param rev Starting Meta Storage revision.
+     * @param listener Listener which will be notified for each update.
+     */
+    void watchExact(Collection<byte[]> keys, long rev, WatchListener listener);
+
+    /**
+     * Starts all registered watches.
+     *
+     * <p>Before calling this method, watches will not receive any updates.
+     *
+     * @param revisionCallback Callback that will be invoked after all watches of a particular revision are processed, with the revision
+     *      as its argument.
+     */
+    void startWatches(LongConsumer revisionCallback);
+
+    /**
+     * Unregisters a watch listener.
+     */
+    void removeWatch(WatchListener listener);
 
     /**
      * Compacts storage (removes tombstones).
-     * TODO: IGNITE-16444 Сorrect compaction for Metastorage.
+     * TODO: IGNITE-16444 Correct compaction for Metastorage.

Review Comment:
   What exactly has changed? Did it have a Cyrillic letter?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupListener;
+
+/**
+ * Meta Storage Raft group listener for learner nodes. These nodes ignore read and cursor-related commands.
+ */
+public class MetaStorageLearnerListener implements RaftGroupListener {
+    private final KeyValueStorage storage;
+
+    private final MetaStorageWriteHandler writeHandler;
+
+    public MetaStorageLearnerListener(KeyValueStorage storage) {
+        this.storage = storage;
+        this.writeHandler = new MetaStorageWriteHandler(storage);
+    }
+
+    @Override
+    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+        throw new UnsupportedOperationException("Reads should not be sent to learners");
+    }
+
+    @Override
+    public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
+        while (iter.hasNext()) {
+            CommandClosure<WriteCommand> clo = iter.next();
+
+            if (!writeHandler.handleWriteCommand(clo)) {
+                // Ignore all commands that are not handled by the writeHandler.
+                clo.result(null);

Review Comment:
   Is this safe? What if we need to handle next command in the collection, but you already completed the closure?
   Who would complete the closure when you were able to handle every command from the collection?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.info.CompoundConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.ConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.IfInfo;
+import org.apache.ignite.internal.metastorage.command.info.OperationInfo;
+import org.apache.ignite.internal.metastorage.command.info.SimpleConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.StatementInfo;
+import org.apache.ignite.internal.metastorage.command.info.UpdateInfo;
+import org.apache.ignite.internal.metastorage.dsl.CompoundConditionType;
+import org.apache.ignite.internal.metastorage.dsl.ConditionType;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.metastorage.server.AndCondition;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.If;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OrCondition;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+
+/**
+ * Class containing some common logic for Meta Storage Raft group listeners.
+ */
+class MetaStorageWriteHandler {
+    private final MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+    private final KeyValueStorage storage;
+
+    MetaStorageWriteHandler(KeyValueStorage storage) {
+        this.storage = storage;
+    }
+
+    /**
+     * Tries to process a {@link WriteCommand}, returning {@code true} if the command has been successfully processed or {@code false}
+     * if the command requires external processing.
+     */
+    boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
+        WriteCommand command = clo.command();
+
+        if (command instanceof PutCommand) {

Review Comment:
   Another unrelated thought - why do we have to have a spaghetti-code in every raft listener? There should be an individual handler class for every command. I do realize that you simply copied it, so don't take it personal. It feels like Ignite developers don't give a damn about SOLID.



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for Meta Storage Watches.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ConfigurationExtension.class)
+public class ItMetaStorageWatchTest {
+    private static class Node {
+        private final ClusterService clusterService;
+
+        private final RaftManager raftManager;
+
+        private final MetaStorageManager metaStorageManager;
+
+        private final CompletableFuture<Set<String>> metaStorageNodesFuture = new CompletableFuture<>();
+
+        Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) {
+            this.clusterService = clusterService;
+
+            Path basePath = dataPath.resolve(name());
+
+            this.raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    basePath.resolve("raft"),
+                    new HybridClockImpl()
+            );
+
+            var vaultManager = mock(VaultManager.class);
+
+            when(vaultManager.get(any())).thenReturn(CompletableFuture.completedFuture(null));
+            when(vaultManager.put(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
+
+            var cmgManager = mock(ClusterManagementGroupManager.class);
+
+            when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFuture);
+
+            this.metaStorageManager = new MetaStorageManagerImpl(
+                    vaultManager,
+                    clusterService,
+                    cmgManager,
+                    raftManager,
+                    new RocksDbKeyValueStorage(name(), basePath.resolve("storage"))
+            );
+        }
+
+        void start(Set<String> metaStorageNodes) {
+            clusterService.start();
+            raftManager.start();
+            metaStorageManager.start();
+
+            metaStorageNodesFuture.complete(metaStorageNodes);
+        }
+
+        String name() {
+            return clusterService.localConfiguration().getName();
+        }
+
+        void stop() throws Exception {
+            Stream<AutoCloseable> beforeNodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::beforeNodeStop);
+
+            Stream<AutoCloseable> nodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::stop);
+
+            IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop));
+        }
+    }
+
+    private TestInfo testInfo;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
+    private final List<Node> nodes = new ArrayList<>();
+
+    /**
+     * Run {@code NODES} cluster nodes.

Review Comment:
   Same here, please move it to "startNodes" or simply remove it



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java:
##########
@@ -213,29 +165,118 @@ public class ItMetaStorageServiceTest {
         EXPECTED_SRV_RESULT_COLL = List.of(entry1, entry2);
     }
 
+    private static class Node {
+        private final ClusterService clusterService;
+
+        private final RaftManager raftManager;
+
+        private final KeyValueStorage mockStorage;
+
+        private RaftGroupService metaStorageRaftService;
+
+        private MetaStorageService metaStorageService;
+
+        Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) {
+            this.clusterService = clusterService;
+
+            this.raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    dataPath.resolve(name()),
+                    new HybridClockImpl()
+            );
+
+            this.mockStorage = mock(KeyValueStorage.class);
+        }
+
+        void start(PeersAndLearners configuration) {
+            clusterService.start();
+            raftManager.start();
+
+            CompletableFuture<RaftGroupService> raftService = startRaftService(configuration);
+
+            assertThat(raftService, willCompleteSuccessfully());
+
+            metaStorageRaftService = raftService.join();
+
+            ClusterNode node = clusterService.topologyService().localMember();
+
+            metaStorageService = new MetaStorageServiceImpl(metaStorageRaftService, node);
+        }
+
+        String name() {
+            return clusterService.localConfiguration().getName();
+        }
+
+        private CompletableFuture<RaftGroupService> startRaftService(PeersAndLearners configuration) {
+            String name = name();
+
+            boolean isLearner = configuration.peer(name) == null;
+
+            Peer peer = isLearner ? configuration.learner(name) : configuration.peer(name);
+
+            assert peer != null;
+
+            RaftGroupListener listener = isLearner ? new MetaStorageLearnerListener(mockStorage) : new MetaStorageListener(mockStorage);
+
+            var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer);
+
+            try {
+                return raftManager.startRaftGroupNode(raftNodeId, configuration, listener, RaftGroupEventsListener.noopLsnr);
+            } catch (NodeStoppingException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        void stop() throws Exception {
+            Stream<AutoCloseable> raftStop = Stream.of(
+                    metaStorageRaftService == null ? null : (AutoCloseable) metaStorageRaftService::shutdown,
+                    () -> raftManager.stopRaftNodes(MetastorageGroupId.INSTANCE)
+            );
+
+            Stream<AutoCloseable> beforeNodeStop = Stream.of(raftManager, clusterService).map(c -> c::beforeNodeStop);
+
+            Stream<AutoCloseable> nodeStop = Stream.of(raftManager, clusterService).map(c -> c::stop);
+
+            IgniteUtils.closeAll(Stream.of(raftStop, beforeNodeStop, nodeStop).flatMap(Function.identity()));
+        }
+    }
+
+    private TestInfo testInfo;
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
+    private final List<Node> nodes = new ArrayList<>();
+
     /**

Review Comment:
   Comment is misplaced



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when they leave Logical Topology
+                                //  see https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in case we missed some updates.
+                                addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService())
+                        .thenAccept(isLeader -> {
+                            if (isLeader) {
+                                service.closeCursors(member.id());
+                            }
+                        }));
+            }
+        });
+    }
+
+    private void addLearners(Collection<ClusterNode> nodes) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(raftService -> isCurrentNodeLeader(raftService)

Review Comment:
   I think that code would be better if you extracted this lambda into a method, but it's up to you



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when they leave Logical Topology
+                                //  see https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in case we missed some updates.
+                                addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {

Review Comment:
   Would be nice to have these as default implementations in the interface



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -159,54 +135,164 @@ public MetaStorageManagerImpl(
         this.storage = storage;
     }
 
-    private CompletableFuture<MetaStorageService> initializeMetaStorage(Set<String> metaStorageNodes) {
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
         ClusterNode thisNode = clusterService.topologyService().localMember();
 
-        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
-
-        Peer localPeer = configuration.peer(thisNode.name());
+        String thisNodeName = thisNode.name();
 
         CompletableFuture<RaftGroupService> raftServiceFuture;
 
         try {
-            if (localPeer == null) {
-                raftServiceFuture = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, configuration);
-            } else {
-                clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
-                    @Override
-                    public void onDisappeared(ClusterNode member) {
-                        metaStorageSvcFut.thenAccept(svc -> svc.closeCursors(member.id()));
-                    }
-                });
+            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
+            if (metaStorageNodes.contains(thisNodeName)) {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+
+                Peer localPeer = configuration.peer(thisNodeName);
 
-                storage.start();
+                assert localPeer != null;
 
                 raftServiceFuture = raftMgr.startRaftGroupNode(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
+                        new RaftGroupEventsListener() {
+                            @Override
+                            public void onLeaderElected(long term) {
+                                // TODO: add listener to remove learners when they leave Logical Topology
+                                //  see https://issues.apache.org/jira/browse/IGNITE-18554
+
+                                registerTopologyEventListener();
+
+                                // Update the configuration immediately in case we missed some updates.
+                                addLearners(clusterService.topologyService().allMembers());
+                            }
+
+                            @Override
+                            public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
+                            }
+
+                            @Override
+                            public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
+                            }
+                        }
+                );
+            } else {
+                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+
+                Peer localPeer = configuration.learner(thisNodeName);
+
+                assert localPeer != null;
+
+                raftServiceFuture = raftMgr.startRaftGroupNode(
+                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                        configuration,
+                        new MetaStorageLearnerListener(storage),
                         RaftGroupEventsListener.noopLsnr
                 );
             }
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
 
-        return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
+        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, thisNode));
+    }
+
+    private void registerTopologyEventListener() {
+        clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                addLearners(List.of(member));
+            }
+
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService())
+                        .thenAccept(isLeader -> {
+                            if (isLeader) {
+                                service.closeCursors(member.id());
+                            }
+                        }));
+            }
+        });
+    }
+
+    private void addLearners(Collection<ClusterNode> nodes) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+            return;
+        }
+
+        try {
+            metaStorageSvcFut
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(raftService -> isCurrentNodeLeader(raftService)
+                            .thenCompose(isLeader -> {
+                                if (!isLeader) {
+                                    return CompletableFuture.completedFuture(null);
+                                }
+
+                                if (!busyLock.enterBusy()) {
+                                    LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+                                    return CompletableFuture.completedFuture(null);
+                                }
+
+                                try {
+                                    Set<String> peers = raftService.peers().stream()
+                                            .map(Peer::consistentId)
+                                            .collect(toUnmodifiableSet());
+
+                                    Set<String> learners = nodes.stream()
+                                            .map(ClusterNode::name)
+                                            .filter(name -> !peers.contains(name))
+                                            .collect(toUnmodifiableSet());
+
+                                    LOG.info("New Meta Storage learners detected: " + learners);
+
+                                    if (learners.isEmpty()) {
+                                        return CompletableFuture.completedFuture(null);
+                                    }

Review Comment:
   Should we swap there two statements?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.info.CompoundConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.ConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.IfInfo;
+import org.apache.ignite.internal.metastorage.command.info.OperationInfo;
+import org.apache.ignite.internal.metastorage.command.info.SimpleConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.StatementInfo;
+import org.apache.ignite.internal.metastorage.command.info.UpdateInfo;
+import org.apache.ignite.internal.metastorage.dsl.CompoundConditionType;
+import org.apache.ignite.internal.metastorage.dsl.ConditionType;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.metastorage.server.AndCondition;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.If;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OrCondition;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+
+/**
+ * Class containing some common logic for Meta Storage Raft group listeners.
+ */
+class MetaStorageWriteHandler {
+    private final MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+    private final KeyValueStorage storage;
+
+    MetaStorageWriteHandler(KeyValueStorage storage) {
+        this.storage = storage;
+    }
+
+    /**
+     * Tries to process a {@link WriteCommand}, returning {@code true} if the command has been successfully processed or {@code false}
+     * if the command requires external processing.
+     */
+    boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
+        WriteCommand command = clo.command();
+
+        if (command instanceof PutCommand) {
+            PutCommand putCmd = (PutCommand) command;
+
+            storage.put(putCmd.key(), putCmd.value());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutCommand) {
+            GetAndPutCommand getAndPutCmd = (GetAndPutCommand) command;
+
+            Entry e = storage.getAndPut(getAndPutCmd.key(), getAndPutCmd.value());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+        } else if (command instanceof PutAllCommand) {
+            PutAllCommand putAllCmd = (PutAllCommand) command;
+
+            storage.putAll(putAllCmd.keys(), putAllCmd.values());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutAllCommand) {
+            GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) command;
+
+            Collection<Entry> entries = storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.values());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));
+        } else if (command instanceof RemoveCommand) {
+            RemoveCommand rmvCmd = (RemoveCommand) command;
+
+            storage.remove(rmvCmd.key());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveCommand) {
+            GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) command;
+
+            Entry e = storage.getAndRemove(getAndRmvCmd.key());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+        } else if (command instanceof RemoveAllCommand) {
+            RemoveAllCommand rmvAllCmd = (RemoveAllCommand) command;
+
+            storage.removeAll(rmvAllCmd.keys());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveAllCommand) {
+            GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) command;
+
+            Collection<Entry> entries = storage.getAndRemoveAll(getAndRmvAllCmd.keys());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));
+        } else if (command instanceof InvokeCommand) {
+            InvokeCommand cmd = (InvokeCommand) command;
+
+            boolean res = storage.invoke(
+                    toCondition(cmd.condition()),
+                    toOperations(cmd.success()),
+                    toOperations(cmd.failure())
+            );
+
+            clo.result(res);
+        } else if (command instanceof MultiInvokeCommand) {
+            MultiInvokeCommand cmd = (MultiInvokeCommand) command;
+
+            StatementResult res = storage.invoke(toIf(cmd.iif()));
+
+            clo.result(commandsFactory.statementResultInfo().result(res.bytes()).build());
+        } else {
+            return false;
+        }
+
+        return true;
+    }
+
+    private static If toIf(IfInfo iif) {
+        return new If(toCondition(iif.cond()), toConditionBranch(iif.andThen()), toConditionBranch(iif.orElse()));
+    }
+
+    private static Update toUpdate(UpdateInfo updateInfo) {
+        return new Update(toOperations(new ArrayList<>(updateInfo.operations())), new StatementResult(updateInfo.result().result()));
+    }
+
+    private static Statement toConditionBranch(StatementInfo statementInfo) {
+        if (statementInfo.isTerminal()) {
+            return new Statement(toUpdate(statementInfo.update()));
+        } else {
+            return new Statement(toIf(statementInfo.iif()));
+        }
+    }
+
+    private static Condition toCondition(ConditionInfo info) {
+        if (info instanceof SimpleConditionInfo) {
+            SimpleConditionInfo inf = (SimpleConditionInfo) info;
+            byte[] key = inf.key();
+
+            ConditionType type = inf.type();
+
+            if (type == ConditionType.KEY_EXISTS) {

Review Comment:
   This should have been a switch :(



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.GetAndPutAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
+import org.apache.ignite.internal.metastorage.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.PutAllCommand;
+import org.apache.ignite.internal.metastorage.command.PutCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
+import org.apache.ignite.internal.metastorage.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.command.SingleEntryResponse;
+import org.apache.ignite.internal.metastorage.command.info.CompoundConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.ConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.IfInfo;
+import org.apache.ignite.internal.metastorage.command.info.OperationInfo;
+import org.apache.ignite.internal.metastorage.command.info.SimpleConditionInfo;
+import org.apache.ignite.internal.metastorage.command.info.StatementInfo;
+import org.apache.ignite.internal.metastorage.command.info.UpdateInfo;
+import org.apache.ignite.internal.metastorage.dsl.CompoundConditionType;
+import org.apache.ignite.internal.metastorage.dsl.ConditionType;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.metastorage.server.AndCondition;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
+import org.apache.ignite.internal.metastorage.server.If;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OrCondition;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+
+/**
+ * Class containing some common logic for Meta Storage Raft group listeners.
+ */
+class MetaStorageWriteHandler {
+    private final MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+    private final KeyValueStorage storage;
+
+    MetaStorageWriteHandler(KeyValueStorage storage) {
+        this.storage = storage;
+    }
+
+    /**
+     * Tries to process a {@link WriteCommand}, returning {@code true} if the command has been successfully processed or {@code false}
+     * if the command requires external processing.
+     */
+    boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
+        WriteCommand command = clo.command();
+
+        if (command instanceof PutCommand) {
+            PutCommand putCmd = (PutCommand) command;
+
+            storage.put(putCmd.key(), putCmd.value());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutCommand) {
+            GetAndPutCommand getAndPutCmd = (GetAndPutCommand) command;
+
+            Entry e = storage.getAndPut(getAndPutCmd.key(), getAndPutCmd.value());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+        } else if (command instanceof PutAllCommand) {
+            PutAllCommand putAllCmd = (PutAllCommand) command;
+
+            storage.putAll(putAllCmd.keys(), putAllCmd.values());
+
+            clo.result(null);
+        } else if (command instanceof GetAndPutAllCommand) {
+            GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) command;
+
+            Collection<Entry> entries = storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.values());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));
+        } else if (command instanceof RemoveCommand) {
+            RemoveCommand rmvCmd = (RemoveCommand) command;
+
+            storage.remove(rmvCmd.key());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveCommand) {
+            GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) command;
+
+            Entry e = storage.getAndRemove(getAndRmvCmd.key());
+
+            clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+        } else if (command instanceof RemoveAllCommand) {
+            RemoveAllCommand rmvAllCmd = (RemoveAllCommand) command;
+
+            storage.removeAll(rmvAllCmd.keys());
+
+            clo.result(null);
+        } else if (command instanceof GetAndRemoveAllCommand) {
+            GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) command;
+
+            Collection<Entry> entries = storage.getAndRemoveAll(getAndRmvAllCmd.keys());
+
+            List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
+
+            for (Entry e : entries) {
+                resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+            }
+
+            clo.result(new MultipleEntryResponse(resp));

Review Comment:
   I think we should only pass the value here if it's the leader. Any thoughts?
   I'm not asking to fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org