You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "sashapolo (via GitHub)" <gi...@apache.org> on 2023/06/22 15:08:51 UTC

[GitHub] [ignite-3] sashapolo opened a new pull request, #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

sashapolo opened a new pull request, #2239:
URL: https://github.com/apache/ignite-3/pull/2239

   https://issues.apache.org/jira/browse/IGNITE-19199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1244860586


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -137,66 +144,123 @@ public MetaStorageManagerImpl(
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
+    }
 
-        CompletableFuture<RaftGroupService> raftServiceFuture;
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(
+            Set<String> metaStorageNodes, MetaStorageConfiguration metaStorageConfig
+    ) {
+        assert metaStorageConfig != null : "Meta Storage configuration has not been set";
 
         try {
-            var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
+            String thisNodeName = clusterService.nodeName();
 
-            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
-            if (metaStorageNodes.contains(thisNodeName)) {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+            var disruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
 
-                Peer localPeer = configuration.peer(thisNodeName);
+            CompletableFuture<? extends RaftGroupService> raftServiceFuture = metaStorageNodes.contains(thisNodeName)
+                    ? startFollowerNode(metaStorageNodes, disruptorConfig, metaStorageConfig)
+                    : startLearnerNode(metaStorageNodes, disruptorConfig);
 
-                assert localPeer != null;
+            return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
-                        configuration,
-                        new MetaStorageListener(storage, clusterTime),
-                        new MetaStorageRaftGroupEventsListener(
-                                busyLock,
-                                clusterService,
-                                logicalTopologyService,
-                                metaStorageSvcFut,
-                                clusterTime
-                        ),
-                        ownFsmCallerExecutorDisruptorConfig
-                );
-            } else {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+    private CompletableFuture<? extends RaftGroupService> startFollowerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig, MetaStorageConfiguration metaStorageConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
 
-                Peer localPeer = configuration.learner(thisNodeName);
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
 
-                assert localPeer != null;
+        Peer localPeer = configuration.peer(thisNodeName);
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
+        assert localPeer != null;
+
+        CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture =
+                raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage, clusterTime),
                         RaftGroupEventsListener.noopLsnr,
-                        ownFsmCallerExecutorDisruptorConfig
+                        disruptorConfig,
+                        topologyAwareRaftGroupServiceFactory
                 );
-            }
-        } catch (NodeStoppingException e) {
-            return CompletableFuture.failedFuture(e);
-        }
 
-        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        raftServiceFuture
+                .thenAccept(service -> service.subscribeLeader(new MetaStorageLeaderElectionListener(
+                        busyLock,
+                        clusterService,
+                        logicalTopologyService,
+                        metaStorageSvcFut,
+                        clusterTime,
+                        metaStorageConfig
+                )));
+
+        return raftServiceFuture;
+    }
+
+    private CompletableFuture<? extends RaftGroupService> startLearnerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
+
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));

Review Comment:
   What do you mean? Can you clarify please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov merged pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov merged PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1244861676


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -119,48 +146,81 @@ public void updateSafeTime(HybridTimestamp newValue) {
     }
 
     /**
-     * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time,
-     * hybrid clock's latest time and {@code ts} adding 1 logical tick to the result.
+     * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time, hybrid clock's latest time and
+     * {@code ts} adding 1 logical tick to the result.

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ademakov commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "ademakov (via GitHub)" <gi...@apache.org>.
ademakov commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1242466447


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java:
##########
@@ -62,87 +66,108 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen
      *
      * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
      */
+    @Nullable
     private CompletableFuture<Void> serializationFuture = null;
 
     private final Object serializationFutureMux = new Object();
 
     private final ClusterTimeImpl clusterTime;
 
-    MetaStorageRaftGroupEventsListener(
+    private final LogicalTopologyEventListener logicalTopologyEventListener = new MetaStorageLogicalTopologyEventListener();
+
+    private final MetaStorageConfiguration metaStorageConfiguration;
+
+    /**
+     * Leader term if this node is a leader, {@code null} otherwise.
+     *
+     * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
+     */
+    @Nullable
+    private Long thisNodeTerm = null;
+
+    MetaStorageLeaderElectionListener(
             IgniteSpinBusyLock busyLock,
             ClusterService clusterService,
             LogicalTopologyService logicalTopologyService,
             CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut,
-            ClusterTimeImpl clusterTime
+            ClusterTimeImpl clusterTime,
+            MetaStorageConfiguration metaStorageConfiguration
     ) {
         this.busyLock = busyLock;
         this.nodeName = clusterService.nodeName();
         this.logicalTopologyService = logicalTopologyService;
         this.metaStorageSvcFut = metaStorageSvcFut;
         this.clusterTime = clusterTime;
+        this.metaStorageConfiguration = metaStorageConfiguration;
     }
 
     @Override
-    public void onLeaderElected(long term) {
+    public void onLeaderElected(ClusterNode node, long term) {
+        System.err.println("FUCK: " + node + " " + term);

Review Comment:
   Huh?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1243253962


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -119,48 +146,81 @@ public void updateSafeTime(HybridTimestamp newValue) {
     }
 
     /**
-     * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time,
-     * hybrid clock's latest time and {@code ts} adding 1 logical tick to the result.
+     * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time, hybrid clock's latest time and
+     * {@code ts} adding 1 logical tick to the result.

Review Comment:
   Auto-formatted comment?



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl.SyncTimeAction;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link ClusterTimeImpl}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ClusterTimeTest {
+    private final ClusterTimeImpl clusterTime = new ClusterTimeImpl("foo", new IgniteSpinBusyLock(), new HybridClockImpl());
+
+    @AfterEach
+    void tearDown() throws Exception {
+        clusterTime.close();

Review Comment:
   Maybe that's silly, but just in case please add assertion that `close` took less than 5 seconds



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -137,66 +144,123 @@ public MetaStorageManagerImpl(
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
+    }
 
-        CompletableFuture<RaftGroupService> raftServiceFuture;
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(
+            Set<String> metaStorageNodes, MetaStorageConfiguration metaStorageConfig

Review Comment:
   ```suggestion
               Set<String> metaStorageNodes,
               MetaStorageConfiguration metaStorageConfig
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -137,66 +144,123 @@ public MetaStorageManagerImpl(
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
+    }
 
-        CompletableFuture<RaftGroupService> raftServiceFuture;
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(
+            Set<String> metaStorageNodes, MetaStorageConfiguration metaStorageConfig
+    ) {
+        assert metaStorageConfig != null : "Meta Storage configuration has not been set";
 
         try {
-            var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
+            String thisNodeName = clusterService.nodeName();
 
-            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
-            if (metaStorageNodes.contains(thisNodeName)) {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+            var disruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
 
-                Peer localPeer = configuration.peer(thisNodeName);
+            CompletableFuture<? extends RaftGroupService> raftServiceFuture = metaStorageNodes.contains(thisNodeName)
+                    ? startFollowerNode(metaStorageNodes, disruptorConfig, metaStorageConfig)
+                    : startLearnerNode(metaStorageNodes, disruptorConfig);
 
-                assert localPeer != null;
+            return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
-                        configuration,
-                        new MetaStorageListener(storage, clusterTime),
-                        new MetaStorageRaftGroupEventsListener(
-                                busyLock,
-                                clusterService,
-                                logicalTopologyService,
-                                metaStorageSvcFut,
-                                clusterTime
-                        ),
-                        ownFsmCallerExecutorDisruptorConfig
-                );
-            } else {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+    private CompletableFuture<? extends RaftGroupService> startFollowerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig, MetaStorageConfiguration metaStorageConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
 
-                Peer localPeer = configuration.learner(thisNodeName);
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
 
-                assert localPeer != null;
+        Peer localPeer = configuration.peer(thisNodeName);
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
+        assert localPeer != null;
+
+        CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture =
+                raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage, clusterTime),
                         RaftGroupEventsListener.noopLsnr,
-                        ownFsmCallerExecutorDisruptorConfig
+                        disruptorConfig,
+                        topologyAwareRaftGroupServiceFactory
                 );
-            }
-        } catch (NodeStoppingException e) {
-            return CompletableFuture.failedFuture(e);
-        }
 
-        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        raftServiceFuture
+                .thenAccept(service -> service.subscribeLeader(new MetaStorageLeaderElectionListener(
+                        busyLock,
+                        clusterService,
+                        logicalTopologyService,
+                        metaStorageSvcFut,
+                        clusterTime,
+                        metaStorageConfig
+                )));
+
+        return raftServiceFuture;
+    }
+
+    private CompletableFuture<? extends RaftGroupService> startLearnerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
+
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));

Review Comment:
   Is this some kind of hack? Not obvious, why it works



##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.configuration;
+
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Configuration schema for the Meta Storage module.
+ */
+@ConfigurationRoot(rootName = "metaStorage", type = ConfigurationType.DISTRIBUTED)
+public class MetaStorageConfigurationSchema {
+    /**
+     * Duration (in milliseconds) used to determine how often to issue time sync commands when the Meta Storage is idle
+     * (no writes are being issued).
+     *
+     * <p>Making this value too small increases the network load, while making this value too large can lead to increased latency of
+     * Meta Storage reads.
+     */
+    @Value(hasDefault = true)
+    @Range(min = 1)
+    public long idleSyncTimeInterval = 1000;

Review Comment:
   This is the part that you will not like. There must be a correlation between idle sync and delay duration (DD). It should never be bigger. I would even go as far as forcing it to be at least twice as small, otherwise the likelihood of stalls would be high.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -119,48 +146,81 @@ public void updateSafeTime(HybridTimestamp newValue) {
     }
 
     /**
-     * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time,
-     * hybrid clock's latest time and {@code ts} adding 1 logical tick to the result.
+     * Updates hybrid logical clock using {@code ts}. Selects the maximum between current system time, hybrid clock's latest time and
+     * {@code ts} adding 1 logical tick to the result.
      *
      * @param ts Timestamp.
      */
-    public void adjust(HybridTimestamp ts) {
+    public synchronized void adjust(HybridTimestamp ts) {
         this.clock.update(ts);
+
+        // Since this method is called when a write command is being processed and safe time is also updated by write commands,
+        // we need to re-schedule the idle time scheduler.
+        if (safeTimeScheduler != null) {
+            safeTimeScheduler.schedule();
+        }
     }
 
-    private class LeaderTimer {
+    private class SafeTimeScheduler {
+        private final SyncTimeAction syncTimeAction;
 
-        private final MetaStorageServiceImpl service;
+        private final MetaStorageConfiguration configuration;
 
-        private LeaderTimer(MetaStorageServiceImpl service) {
-            this.service = service;
+        private final ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, "meta-storage-safe-time", LOG));
+
+        /**
+         * Current scheduled task.
+         *
+         * <p>Concurrent access is guarded by {@code this}.
+         */
+        @Nullable
+        private ScheduledFuture<?> currentTask;
+
+        SafeTimeScheduler(SyncTimeAction syncTimeAction, MetaStorageConfiguration configuration) {
+            this.syncTimeAction = syncTimeAction;
+            this.configuration = configuration;
         }
 
         void start() {
             schedule();
         }
 
-        private void schedule() {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19199 Only propagate safe time when ms is idle
-        }
-
-        void disseminateTime() {
-            if (!busyLock.enterBusy()) {
-                // Shutting down.
-                return;
+        synchronized void schedule() {
+            // Cancel the previous task if we were re-scheduled because Meta Storage was not actually idle.
+            if (currentTask != null) {
+                currentTask.cancel(false);
             }
 
-            try {
-                HybridTimestamp now = clock.now();
-
-                service.syncTime(now);
-            } finally {
-                busyLock.leaveBusy();
-            }
+            currentTask = executorService.schedule(() -> {
+                if (!busyLock.enterBusy()) {
+                    return;
+                }
+
+                try {
+                    syncTimeAction.syncTime(clock.now())
+                            .whenComplete((v, e) -> {
+                                if (e != null && !(unwrapCause(e) instanceof CancellationException)) {
+                                    LOG.error("Unable to perform idle time sync", e);
+                                }
+                            });
+
+                    // re-schedule the task again.

Review Comment:
   ```suggestion
                       // Re-schedule the task again.
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -137,66 +144,123 @@ public MetaStorageManagerImpl(
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
+    }
 
-        CompletableFuture<RaftGroupService> raftServiceFuture;
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(
+            Set<String> metaStorageNodes, MetaStorageConfiguration metaStorageConfig
+    ) {
+        assert metaStorageConfig != null : "Meta Storage configuration has not been set";
 
         try {
-            var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
+            String thisNodeName = clusterService.nodeName();
 
-            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
-            if (metaStorageNodes.contains(thisNodeName)) {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+            var disruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
 
-                Peer localPeer = configuration.peer(thisNodeName);
+            CompletableFuture<? extends RaftGroupService> raftServiceFuture = metaStorageNodes.contains(thisNodeName)
+                    ? startFollowerNode(metaStorageNodes, disruptorConfig, metaStorageConfig)
+                    : startLearnerNode(metaStorageNodes, disruptorConfig);
 
-                assert localPeer != null;
+            return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
-                        configuration,
-                        new MetaStorageListener(storage, clusterTime),
-                        new MetaStorageRaftGroupEventsListener(
-                                busyLock,
-                                clusterService,
-                                logicalTopologyService,
-                                metaStorageSvcFut,
-                                clusterTime
-                        ),
-                        ownFsmCallerExecutorDisruptorConfig
-                );
-            } else {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+    private CompletableFuture<? extends RaftGroupService> startFollowerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig, MetaStorageConfiguration metaStorageConfig

Review Comment:
   ```suggestion
               Set<String> metaStorageNodes,
               RaftNodeDisruptorConfiguration disruptorConfig,
               MetaStorageConfiguration metaStorageConfig
   ```



##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl.SyncTimeAction;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link ClusterTimeImpl}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ClusterTimeTest {
+    private final ClusterTimeImpl clusterTime = new ClusterTimeImpl("foo", new IgniteSpinBusyLock(), new HybridClockImpl());
+
+    @AfterEach
+    void tearDown() throws Exception {
+        clusterTime.close();
+    }
+
+    @Test
+    void testWaitFor() {
+        HybridTimestamp now = clusterTime.now();
+
+        CompletableFuture<Void> future = clusterTime.waitFor(now);
+
+        clusterTime.updateSafeTime(now);
+
+        assertThat(future, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testWaitForCancellation() throws Exception {
+        HybridTimestamp now = clusterTime.now();
+
+        CompletableFuture<Void> future = clusterTime.waitFor(now);
+
+        clusterTime.close();
+
+        assertThat(future, willThrow(TrackerClosedException.class));
+    }
+
+    @Test
+    void testIdleSafeTimeScheduler(@InjectConfiguration("mock.idleSyncTimeInterval=1") MetaStorageConfiguration config) {
+        SyncTimeAction action = mock(SyncTimeAction.class);
+
+        when(action.syncTime(any())).thenReturn(completedFuture(null));
+
+        clusterTime.startSafeTimeScheduler(action, config);
+
+        verify(action, timeout(100).atLeast(3)).syncTime(any());

Review Comment:
   This will eventually fail due to some long GC pause. Any idea how to make test more robust?
   Does mockito stop its timer preemptively if "atLeast" condition is met?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1244913159


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -137,66 +144,123 @@ public MetaStorageManagerImpl(
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
+    }
 
-        CompletableFuture<RaftGroupService> raftServiceFuture;
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(
+            Set<String> metaStorageNodes, MetaStorageConfiguration metaStorageConfig
+    ) {
+        assert metaStorageConfig != null : "Meta Storage configuration has not been set";
 
         try {
-            var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
+            String thisNodeName = clusterService.nodeName();
 
-            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
-            if (metaStorageNodes.contains(thisNodeName)) {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+            var disruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
 
-                Peer localPeer = configuration.peer(thisNodeName);
+            CompletableFuture<? extends RaftGroupService> raftServiceFuture = metaStorageNodes.contains(thisNodeName)
+                    ? startFollowerNode(metaStorageNodes, disruptorConfig, metaStorageConfig)
+                    : startLearnerNode(metaStorageNodes, disruptorConfig);
 
-                assert localPeer != null;
+            return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
-                        configuration,
-                        new MetaStorageListener(storage, clusterTime),
-                        new MetaStorageRaftGroupEventsListener(
-                                busyLock,
-                                clusterService,
-                                logicalTopologyService,
-                                metaStorageSvcFut,
-                                clusterTime
-                        ),
-                        ownFsmCallerExecutorDisruptorConfig
-                );
-            } else {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+    private CompletableFuture<? extends RaftGroupService> startFollowerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig, MetaStorageConfiguration metaStorageConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
 
-                Peer localPeer = configuration.learner(thisNodeName);
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
 
-                assert localPeer != null;
+        Peer localPeer = configuration.peer(thisNodeName);
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
+        assert localPeer != null;
+
+        CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture =
+                raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage, clusterTime),
                         RaftGroupEventsListener.noopLsnr,
-                        ownFsmCallerExecutorDisruptorConfig
+                        disruptorConfig,
+                        topologyAwareRaftGroupServiceFactory
                 );
-            }
-        } catch (NodeStoppingException e) {
-            return CompletableFuture.failedFuture(e);
-        }
 
-        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        raftServiceFuture
+                .thenAccept(service -> service.subscribeLeader(new MetaStorageLeaderElectionListener(
+                        busyLock,
+                        clusterService,
+                        logicalTopologyService,
+                        metaStorageSvcFut,
+                        clusterTime,
+                        metaStorageConfig
+                )));
+
+        return raftServiceFuture;
+    }
+
+    private CompletableFuture<? extends RaftGroupService> startLearnerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
+
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));

Review Comment:
   The fact that we pass a single node as a learner in configuration



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1244858734


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -137,66 +144,123 @@ public MetaStorageManagerImpl(
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
+    }
 
-        CompletableFuture<RaftGroupService> raftServiceFuture;
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(
+            Set<String> metaStorageNodes, MetaStorageConfiguration metaStorageConfig

Review Comment:
   Don't think that's necessary, but ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1244879629


##########
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl.SyncTimeAction;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for {@link ClusterTimeImpl}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ClusterTimeTest {
+    private final ClusterTimeImpl clusterTime = new ClusterTimeImpl("foo", new IgniteSpinBusyLock(), new HybridClockImpl());
+
+    @AfterEach
+    void tearDown() throws Exception {
+        clusterTime.close();
+    }
+
+    @Test
+    void testWaitFor() {
+        HybridTimestamp now = clusterTime.now();
+
+        CompletableFuture<Void> future = clusterTime.waitFor(now);
+
+        clusterTime.updateSafeTime(now);
+
+        assertThat(future, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testWaitForCancellation() throws Exception {
+        HybridTimestamp now = clusterTime.now();
+
+        CompletableFuture<Void> future = clusterTime.waitFor(now);
+
+        clusterTime.close();
+
+        assertThat(future, willThrow(TrackerClosedException.class));
+    }
+
+    @Test
+    void testIdleSafeTimeScheduler(@InjectConfiguration("mock.idleSyncTimeInterval=1") MetaStorageConfiguration config) {
+        SyncTimeAction action = mock(SyncTimeAction.class);
+
+        when(action.syncTime(any())).thenReturn(completedFuture(null));
+
+        clusterTime.startSafeTimeScheduler(action, config);
+
+        verify(action, timeout(100).atLeast(3)).syncTime(any());

Review Comment:
   Verification process will complete if this method will be called 3 times, it will not wait for the end of the timeout. This test triggers this method every millisecond, so the GC pause should be huge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1244920522


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -137,66 +144,123 @@ public MetaStorageManagerImpl(
             LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage,
-            HybridClock clock
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory
     ) {
         this.vaultMgr = vaultMgr;
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
         this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
-        this.clusterTime = new ClusterTimeImpl(busyLock, clock);
+        this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(), busyLock, clock);
+        this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory;
     }
 
-    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) {
-        String thisNodeName = clusterService.nodeName();
+    /**
+     * Constructor for tests, that allows to pass Meta Storage configuration.
+     */
+    @TestOnly
+    public MetaStorageManagerImpl(
+            VaultManager vaultMgr,
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
+            RaftManager raftMgr,
+            KeyValueStorage storage,
+            HybridClock clock,
+            TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory,
+            MetaStorageConfiguration configuration
+    ) {
+        this(vaultMgr, clusterService, cmgMgr, logicalTopologyService, raftMgr, storage, clock, topologyAwareRaftGroupServiceFactory);
+
+        configure(configuration);
+    }
 
-        CompletableFuture<RaftGroupService> raftServiceFuture;
+    private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(
+            Set<String> metaStorageNodes, MetaStorageConfiguration metaStorageConfig
+    ) {
+        assert metaStorageConfig != null : "Meta Storage configuration has not been set";
 
         try {
-            var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
+            String thisNodeName = clusterService.nodeName();
 
-            // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica.
-            if (metaStorageNodes.contains(thisNodeName)) {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
+            var disruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1);
 
-                Peer localPeer = configuration.peer(thisNodeName);
+            CompletableFuture<? extends RaftGroupService> raftServiceFuture = metaStorageNodes.contains(thisNodeName)
+                    ? startFollowerNode(metaStorageNodes, disruptorConfig, metaStorageConfig)
+                    : startLearnerNode(metaStorageNodes, disruptorConfig);
 
-                assert localPeer != null;
+            return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        } catch (NodeStoppingException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                        new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
-                        configuration,
-                        new MetaStorageListener(storage, clusterTime),
-                        new MetaStorageRaftGroupEventsListener(
-                                busyLock,
-                                clusterService,
-                                logicalTopologyService,
-                                metaStorageSvcFut,
-                                clusterTime
-                        ),
-                        ownFsmCallerExecutorDisruptorConfig
-                );
-            } else {
-                PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
+    private CompletableFuture<? extends RaftGroupService> startFollowerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig, MetaStorageConfiguration metaStorageConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
 
-                Peer localPeer = configuration.learner(thisNodeName);
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes);
 
-                assert localPeer != null;
+        Peer localPeer = configuration.peer(thisNodeName);
 
-                raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
+        assert localPeer != null;
+
+        CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture =
+                raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage, clusterTime),
                         RaftGroupEventsListener.noopLsnr,
-                        ownFsmCallerExecutorDisruptorConfig
+                        disruptorConfig,
+                        topologyAwareRaftGroupServiceFactory
                 );
-            }
-        } catch (NodeStoppingException e) {
-            return CompletableFuture.failedFuture(e);
-        }
 
-        return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
+        raftServiceFuture
+                .thenAccept(service -> service.subscribeLeader(new MetaStorageLeaderElectionListener(
+                        busyLock,
+                        clusterService,
+                        logicalTopologyService,
+                        metaStorageSvcFut,
+                        clusterTime,
+                        metaStorageConfig
+                )));
+
+        return raftServiceFuture;
+    }
+
+    private CompletableFuture<? extends RaftGroupService> startLearnerNode(
+            Set<String> metaStorageNodes, RaftNodeDisruptorConfiguration disruptorConfig
+    ) throws NodeStoppingException {
+        String thisNodeName = clusterService.nodeName();
+
+        PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));

Review Comment:
   JRaft does not require a full list of learners to start a Raft node, it only needs to check if it contains the current node



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2239: IGNITE-19199 Propagate safe time when Meta Storage is idle

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2239:
URL: https://github.com/apache/ignite-3/pull/2239#discussion_r1242573594


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java:
##########
@@ -62,87 +66,108 @@ public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListen
      *
      * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
      */
+    @Nullable
     private CompletableFuture<Void> serializationFuture = null;
 
     private final Object serializationFutureMux = new Object();
 
     private final ClusterTimeImpl clusterTime;
 
-    MetaStorageRaftGroupEventsListener(
+    private final LogicalTopologyEventListener logicalTopologyEventListener = new MetaStorageLogicalTopologyEventListener();
+
+    private final MetaStorageConfiguration metaStorageConfiguration;
+
+    /**
+     * Leader term if this node is a leader, {@code null} otherwise.
+     *
+     * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
+     */
+    @Nullable
+    private Long thisNodeTerm = null;
+
+    MetaStorageLeaderElectionListener(
             IgniteSpinBusyLock busyLock,
             ClusterService clusterService,
             LogicalTopologyService logicalTopologyService,
             CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut,
-            ClusterTimeImpl clusterTime
+            ClusterTimeImpl clusterTime,
+            MetaStorageConfiguration metaStorageConfiguration
     ) {
         this.busyLock = busyLock;
         this.nodeName = clusterService.nodeName();
         this.logicalTopologyService = logicalTopologyService;
         this.metaStorageSvcFut = metaStorageSvcFut;
         this.clusterTime = clusterTime;
+        this.metaStorageConfiguration = metaStorageConfiguration;
     }
 
     @Override
-    public void onLeaderElected(long term) {
+    public void onLeaderElected(ClusterNode node, long term) {
+        System.err.println("FUCK: " + node + " " + term);

Review Comment:
   Sorry, one of my debugging lines slipped in =( 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org