You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "tkalkirill (via GitHub)" <gi...@apache.org> on 2023/04/06 04:59:24 UTC

[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1869: IGNITE-19028 Implement safe-time propagation for meta-storage raft-group

tkalkirill commented on code in PR #1869:
URL: https://github.com/apache/ignite-3/pull/1869#discussion_r1159259377


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java:
##########
@@ -84,4 +84,8 @@ public interface MetastorageCommandsMessageGroup {
 
     /** Message type for {@link CloseAllCursorsCommand}. */
     short CLOSE_ALL_CURSORS = 64;
+
+    short HYBRID_TS = 65;
+
+    short SYNC_TIME = 66;

Review Comment:
   Missing javadoc



##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java:
##########
@@ -68,10 +67,9 @@ public HybridTimestamp(long physical, int logical) {
      * @param times Times for comparing.
      * @return The highest hybrid timestamp.
      */
-    public static @Nullable HybridTimestamp max(HybridTimestamp... times) {
-        if (times.length == 0) {
-            return null;
-        }
+    public static HybridTimestamp max(HybridTimestamp... times) {
+        assert times != null;
+        assert times.length > 0;

Review Comment:
   Please indicate it (`times.length > 0`) in the documentation and also correct the description of the method itself.
   Or maybe `throw new IllegalArgumentException` ?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java:
##########
@@ -84,4 +84,8 @@ public interface MetastorageCommandsMessageGroup {
 
     /** Message type for {@link CloseAllCursorsCommand}. */
     short CLOSE_ALL_CURSORS = 64;
+
+    short HYBRID_TS = 65;

Review Comment:
   Missing javadoc



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java:
##########
@@ -156,11 +178,16 @@ private void executeIfLeader(OnLeaderAction action) {
     }
 
     private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action) {
+        return executeWithStatus((service, term, isLeader) -> action.apply(service, term));

Review Comment:
   It seems that here it is necessary to return `completedFuture(null)` if not the leader.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java:
##########
@@ -109,6 +112,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private final ClusterTimeImpl clusterTime;

Review Comment:
   I think we should also stop at `MetaStorageManagerImpl#stop`



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java:
##########
@@ -128,6 +146,10 @@ private interface OnLeaderAction {
         CompletableFuture<Void> apply(MetaStorageServiceImpl service, long term);
     }
 
+    private interface OnStatusAction {

Review Comment:
   Missing `FunctionalInterface`.
   Can get rid of the `OnLeaderAction`?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * Cluster time with a hybrid clock instance and access to safe time.
+ */
+public interface ClusterTime {
+    /**
+     * Returns current cluster time.
+     *
+     * @return Current cluster time.

Review Comment:
   ```suggestion
   ```



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java:
##########
@@ -258,6 +275,20 @@ public Publisher<Entry> prefix(ByteArray prefix, long revUpperBound) {
         return new CursorPublisher(context, createPrefixCommand);
     }
 
+    /**
+     * Sends safe time sync message. Should be called only on the leader node.
+     *
+     * @param safeTime New safe time.
+     * @return Future that will be completed when message is sent.
+     */
+    public CompletableFuture<Void> syncTime(HybridTimestamp safeTime) {

Review Comment:
   I noticed that you do not use the return value, would it not be a mistake to always send a new safeTime even if we did not process the previous one.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageLearnerListener.java:
##########
@@ -56,6 +58,11 @@ public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
         }
     }
 
+    @Override
+    public void onBeforeApply(Command command) {
+        writeHandler.beforeApply(command);

Review Comment:
   Why before and not on command execution?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+    /** The logger. */
+    private static final IgniteLogger LOG = Loggers.forClass(ClusterTimeImpl.class);
+
+    private final IgniteSpinBusyLock busyLock;
+
+    private volatile @Nullable LeaderTimer leaderTimer;
+
+    private final HybridClock clock;
+
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param clock Node's hybrid clock.
+     */
+    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+        this.busyLock = busyLock;
+        this.clock = clock;
+        this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+    }
+
+    /**
+     * Starts sync time scheduler.
+     *
+     * @param service MetaStorage service that is used by scheduler to sync time.
+     */
+    public void startLeaderTimer(MetaStorageServiceImpl service) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            assert leaderTimer == null;
+
+            LeaderTimer newTimer = new LeaderTimer(service);
+
+            leaderTimer = newTimer;
+
+            newTimer.start();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Stops sync time scheduler.
+     */
+    public void stopLeaderTimer() {
+        LeaderTimer timer = leaderTimer;
+
+        assert timer != null;
+
+        timer.stop();
+
+        leaderTimer = null;
+    }
+
+    @Override
+    public HybridTimestamp now() {
+        return clock.now();
+    }
+
+    @Override
+    public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+        return safeTime.waitFor(time);
+    }
+
+    public void updateSafeTime(HybridTimestamp ts) {
+        this.safeTime.update(ts);
+    }
+
+    public void adjust(HybridTimestamp ts) {

Review Comment:
   Little confuse name



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+    /** The logger. */
+    private static final IgniteLogger LOG = Loggers.forClass(ClusterTimeImpl.class);
+
+    private final IgniteSpinBusyLock busyLock;
+
+    private volatile @Nullable LeaderTimer leaderTimer;
+
+    private final HybridClock clock;
+
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param clock Node's hybrid clock.
+     */
+    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+        this.busyLock = busyLock;
+        this.clock = clock;
+        this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+    }
+
+    /**
+     * Starts sync time scheduler.
+     *
+     * @param service MetaStorage service that is used by scheduler to sync time.
+     */
+    public void startLeaderTimer(MetaStorageServiceImpl service) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            assert leaderTimer == null;
+
+            LeaderTimer newTimer = new LeaderTimer(service);
+
+            leaderTimer = newTimer;
+
+            newTimer.start();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Stops sync time scheduler.
+     */
+    public void stopLeaderTimer() {
+        LeaderTimer timer = leaderTimer;
+
+        assert timer != null;
+
+        timer.stop();
+
+        leaderTimer = null;
+    }
+
+    @Override
+    public HybridTimestamp now() {
+        return clock.now();
+    }
+
+    @Override
+    public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+        return safeTime.waitFor(time);
+    }
+
+    public void updateSafeTime(HybridTimestamp ts) {
+        this.safeTime.update(ts);
+    }
+
+    public void adjust(HybridTimestamp ts) {
+        this.clock.update(ts);
+    }
+
+    private class LeaderTimer {
+
+        private final MetaStorageServiceImpl service;
+
+        /** Scheduled executor for cluster time sync. */
+        private final ScheduledExecutorService scheduledClusterTimeSyncExecutor =
+                Executors.newScheduledThreadPool(1, new NamedThreadFactory("scheduled-cluster-time-sync-thread", LOG));

Review Comment:
   Please add nodeName



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.time;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Cluster time implementation with additional methods to adjust time and update safe time.
+ */
+public class ClusterTimeImpl implements ClusterTime {
+    /** The logger. */
+    private static final IgniteLogger LOG = Loggers.forClass(ClusterTimeImpl.class);
+
+    private final IgniteSpinBusyLock busyLock;
+
+    private volatile @Nullable LeaderTimer leaderTimer;
+
+    private final HybridClock clock;
+
+    private final PendingComparableValuesTracker<HybridTimestamp> safeTime;
+
+    /**
+     * Constructor.
+     *
+     * @param busyLock Busy lock.
+     * @param clock Node's hybrid clock.
+     */
+    public ClusterTimeImpl(IgniteSpinBusyLock busyLock, HybridClock clock) {
+        this.busyLock = busyLock;
+        this.clock = clock;
+        this.safeTime = new PendingComparableValuesTracker<>(clock.now());
+    }
+
+    /**
+     * Starts sync time scheduler.
+     *
+     * @param service MetaStorage service that is used by scheduler to sync time.
+     */
+    public void startLeaderTimer(MetaStorageServiceImpl service) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            assert leaderTimer == null;
+
+            LeaderTimer newTimer = new LeaderTimer(service);
+
+            leaderTimer = newTimer;
+
+            newTimer.start();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Stops sync time scheduler.
+     */
+    public void stopLeaderTimer() {
+        LeaderTimer timer = leaderTimer;
+
+        assert timer != null;
+
+        timer.stop();
+
+        leaderTimer = null;
+    }
+
+    @Override
+    public HybridTimestamp now() {
+        return clock.now();
+    }
+
+    @Override
+    public CompletableFuture<Void> waitFor(HybridTimestamp time) {
+        return safeTime.waitFor(time);
+    }
+
+    public void updateSafeTime(HybridTimestamp ts) {
+        this.safeTime.update(ts);
+    }
+
+    public void adjust(HybridTimestamp ts) {
+        this.clock.update(ts);
+    }
+
+    private class LeaderTimer {

Review Comment:
   Why not separate class ?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -66,6 +76,10 @@ class MetaStorageWriteHandler {
     boolean handleWriteCommand(CommandClosure<WriteCommand> clo) {
         WriteCommand command = clo.command();
 
+        if (command instanceof MetaStorageWriteCommand) {
+            clusterTime.updateSafeTime(((MetaStorageWriteCommand) command).safeTime().asHybridTimestamp());

Review Comment:
   I don't understand why this code.



##########
modules/network-api/src/main/java/org/apache/ignite/network/annotations/WithSetter.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.annotations;
+
+/**
+ * Annotation for the {@link Transferable} class methods. If a method is marked with this annotation,
+ * a setter for the field with the same name as method's will be generated.
+ * In order to have access to this setter via interface one can use default method:
+ * <pre>
+ * {@code  @WithSetter
+ *  HybridTimestampMessage safeTime();
+ *
+ *  default void safeTime(HybridTimestampMessage safeTime) {
+ *       // No-op.
+ *  }
+ * }
+ * </pre>
+ * Note that fields with setters will not be final.
+ */
+public @interface WithSetter {

Review Comment:
   Perhaps it would be more correct to add it in a separate ticket and test it more thoroughly.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -234,4 +252,16 @@ private static RevisionCondition.Type toRevisionConditionType(ConditionType type
                 throw new IllegalArgumentException("Unexpected revision condition type " + type);
         }
     }
+
+    void beforeApply(Command command) {

Review Comment:
   At the moment, the code is very strange for me and not clear.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java:
##########
@@ -84,7 +89,20 @@ public void onLeaderElected(long term) {
             registerTopologyEventListeners();
 
             // Update learner configuration (in case we missed some topology updates) and initialize the serialization future.
-            serializationFuture = executeIfLeaderImpl(this::resetLearners);
+            serializationFuture = executeWithStatus((service, term1, isLeader) -> {
+                CompletableFuture<Void> fut;
+                if (isLeader) {
+                    fut = this.resetLearners(service, term1);
+
+                    clusterTime.startLeaderTimer(service);
+                } else {
+                    fut = CompletableFuture.completedFuture(null);

Review Comment:
   ```suggestion
                       fut = completedFuture(null);
   ```



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java:
##########
@@ -375,4 +382,40 @@ void testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo) throws
 
         assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000));
     }
+
+    /**
+     * Tests that safe time is propagated from the leader to the follower/learner.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSafeTimePropagation(boolean useFollower, TestInfo testInfo) throws Exception {

Review Comment:
   There is not enough test to change the leader.



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command;
+
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.network.annotations.WithSetter;
+
+/** Base meta storage write command. */
+public interface MetaStorageWriteCommand extends WriteCommand {
+    /**
+     * Returns time on the initiator node.
+     */
+    HybridTimestampMessage initiatorTime();
+
+    /**
+     * This is a dirty hack. This time is set by the leader node to disseminate new safe time across
+     * followers and learners.
+     */
+    @WithSetter
+    HybridTimestampMessage safeTime();

Review Comment:
   While I don’t understand this dirty hack, we have a separate command for time synchronization.
   We need a very detailed description of this hack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org