You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "rpuch (via GitHub)" <gi...@apache.org> on 2023/04/17 14:53:06 UTC

[GitHub] [ignite-3] rpuch commented on a diff in pull request #1938: IGNITE-19267 Implement local Low Watermark propagation

rpuch commented on code in PR #1938:
URL: https://github.com/apache/ignite-3/pull/1938#discussion_r1168664521


##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Low watermark configuration schema.

Review Comment:
   Do we have any additional information to put here? Like that it relates to the GC and that any data below the watermark should be considered as non-existent? Any link to an IEP or something like this?



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/**
+ * Low watermark configuration schema.
+ */
+@Config
+public class LowWatermarkConfigurationSchema {
+    /** Data availability time (in milliseconds). */

Review Comment:
   Is it true to say that LWM cannot exceed `now - dataAvailabilityTime`? If yes, I think it makes sense to add this invariant here to make it easier for the reader to understand what's being configured.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) {
+                                LOG.error("Error getting low watermark", throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = createNewLowWatermarkCandidate();
+
+            txManager.updateLowerBoundToStartNewReadOnlyTransaction(lowWatermarkCandidate);
+
+            txManager.getFutureAllReadOnlyTransactions(lowWatermarkCandidate)
+                    .thenCompose(unused -> inBusyLock(
+                            busyLock,
+                            () -> vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermarkCandidate)))
+                    )
+                    .thenRun(() -> inBusyLock(busyLock, () -> {
+                        lowWatermark.set(lowWatermarkCandidate);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
+                    }))
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) {
+                                LOG.error("Failed to update low watermark, will schedule again: {}", throwable, lowWatermarkCandidate);
+
+                                inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy);
+                            }
+                        } else {
+                            LOG.info("Successful low watermark update: {}", lowWatermarkCandidate);
+                        }
+                    });
+        });
+    }
+
+    private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp lowWatermark) {
+        mvGc.updateLowWatermark(lowWatermark);
+
+        scheduleUpdateLowWatermarkBusy();
+    }
+
+    private void scheduleUpdateLowWatermarkBusy() {
+        ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get();
+
+        assert previousScheduledFuture == null || previousScheduledFuture.isDone() : "previous scheduled task has not finished";
+
+        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
+                this::updateLowWatermark,
+                lowWatermarkConfig.updateFrequency().value(),
+                TimeUnit.MILLISECONDS
+        );
+
+        boolean casResult = lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, newScheduledFuture);

Review Comment:
   Do we really need a CAS here if we know that we use a single-threaded executor to update it? It seems that just a volatile field would be enough if we drop the CAS.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) {
+                                LOG.error("Error getting low watermark", throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = createNewLowWatermarkCandidate();
+
+            txManager.updateLowerBoundToStartNewReadOnlyTransaction(lowWatermarkCandidate);
+
+            txManager.getFutureAllReadOnlyTransactions(lowWatermarkCandidate)
+                    .thenCompose(unused -> inBusyLock(
+                            busyLock,
+                            () -> vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermarkCandidate)))
+                    )
+                    .thenRun(() -> inBusyLock(busyLock, () -> {
+                        lowWatermark.set(lowWatermarkCandidate);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
+                    }))
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) {
+                                LOG.error("Failed to update low watermark, will schedule again: {}", throwable, lowWatermarkCandidate);
+
+                                inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy);
+                            }
+                        } else {
+                            LOG.info("Successful low watermark update: {}", lowWatermarkCandidate);
+                        }
+                    });
+        });
+    }
+
+    private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp lowWatermark) {
+        mvGc.updateLowWatermark(lowWatermark);
+
+        scheduleUpdateLowWatermarkBusy();
+    }
+
+    private void scheduleUpdateLowWatermarkBusy() {
+        ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get();
+
+        assert previousScheduledFuture == null || previousScheduledFuture.isDone() : "previous scheduled task has not finished";
+
+        ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
+                this::updateLowWatermark,
+                lowWatermarkConfig.updateFrequency().value(),
+                TimeUnit.MILLISECONDS
+        );
+
+        boolean casResult = lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, newScheduledFuture);
+
+        assert casResult : "only one scheduled task is expected";
+    }
+
+    HybridTimestamp createNewLowWatermarkCandidate() {
+        HybridTimestamp now = clock.now();
+
+        long newPhysicalTime = now.getPhysical() - lowWatermarkConfig.dataAvailabilityTime().value() - getMaxClockSkew();
+
+        HybridTimestamp lowWatermarkCandidate = new HybridTimestamp(newPhysicalTime, now.getLogical());

Review Comment:
   How about adding a method for addition of physical time (like `addPhysicalTime(long millis)`) to the `HybridTimestamp` class? It will be useful for schema sync as well.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java:
##########
@@ -277,12 +277,19 @@ public int finished() {
 
         @Override
         public void start() {
+        }
 
+        @Override
+        public void stop() {
         }
 
         @Override
-        public void stop() throws Exception {
+        public void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable HybridTimestamp lowerBound) {
+        }
 
+        @Override
+        public CompletableFuture<Void> getFutureAllReadOnlyTransactions(HybridTimestamp timestamp) {
+            return null;

Review Comment:
   Let's return `completedFuture(null)` as `null` breaks the contract of the method. It's ok now, but in the future something might change and the method might be called even in tests.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater this lower bound.
+     *
+     * @param lowerBound New lower bound, {@code null} if there is no lower bound.
+     */
+    void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable HybridTimestamp lowerBound);

Review Comment:
   Why does parameter need to be nullable? `LowWatermarkManager` always passes a non-null value here.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater this lower bound.
+     *
+     * @param lowerBound New lower bound, {@code null} if there is no lower bound.
+     */
+    void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable HybridTimestamp lowerBound);
+
+    /**
+     * Returns the future of all read-only transactions up to the timestamp.
+     *
+     * @param timestamp Timestamp.
+     */
+    CompletableFuture<Void> getFutureAllReadOnlyTransactions(HybridTimestamp timestamp);

Review Comment:
   I suggest to change the name so that it makes clear that we are waiting till all RO transactions with timestamps earlier than or equal to the given timestamp finish. Like `roTxnsStartedBeforeLowWatermark()` or something better.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) {
+                                LOG.error("Error getting low watermark", throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {

Review Comment:
   It seems to make sense to create an instance method in this class, named `inBusyLock`, accepting just one argument: the closure, and delegating to `IgniteUtils.inBusyLock(busyLock, closure)`. This would allow to write `inBusyLock(() -> blabla)` instead of `inBusyLock(busyLock, () -> blabla)`, avoiding the ceremony of mentioning that `busyLock` variable again and again. After all, there is just one busy lock per instance.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java:
##########
@@ -17,85 +17,84 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * The read-only implementation of an internal transaction.
  */
-public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
+class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
     /** The read timestamp. */
     private final HybridTimestamp readTimestamp;
 
+    /** Prevents double finish of the transaction. */
+    private final AtomicBoolean finishGuard = new AtomicBoolean();
+
     /**
      * The constructor.
      *
      * @param txManager The tx manager.
      * @param id The id.
      * @param readTimestamp The read timestamp.
      */
-    public ReadOnlyTransactionImpl(
-            TxManager txManager,
-            @NotNull UUID id,
-            HybridTimestamp readTimestamp
-    ) {
+    ReadOnlyTransactionImpl(TxManagerImpl txManager, UUID id, HybridTimestamp readTimestamp) {
         super(txManager, id);
+
         this.readTimestamp = readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean isReadOnly() {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public HybridTimestamp readTimestamp() {
         return readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
         // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean assignCommitPartition(ReplicationGroupId replicationGroupId) {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public ReplicationGroupId commitPartition() {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void enlistResultFuture(CompletableFuture<?> resultFuture) {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
+    // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
     protected CompletableFuture<Void> finish(boolean commit) {
-        // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
-        return CompletableFuture.completedFuture(null);
+        if (!finishGuard.compareAndSet(false, true)) {
+            return completedFuture(null);
+        }
+
+        ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(readTimestamp);

Review Comment:
   When users will be given a possibility to explicitly specify a read TS for an RO TX, it will be possible to have more than 1 RO TX with same read TS, so it doesn't seem enough to just pass a TS.



##########
modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java:
##########
@@ -108,4 +125,49 @@ public void testId() throws InterruptedException {
         assertTrue(txId3.compareTo(txId2) > 0);
         assertTrue(txId4.compareTo(txId3) > 0);
     }
+
+    @Test
+    void testUpdateLowerBoundToStartNewReadOnlyTransaction() {
+        when(clock.now()).thenReturn(new HybridTimestamp(10, 10));
+
+        txManager.updateLowerBoundToStartNewReadOnlyTransaction(new HybridTimestamp(10, 11));
+
+        IgniteInternalException exception = assertThrows(IgniteInternalException.class, () -> txManager.begin(true));
+
+        assertEquals(Transactions.TX_READ_ONLY_CREATING_ERR, exception.code());
+
+        // Let's check the removed lower bound.
+        txManager.updateLowerBoundToStartNewReadOnlyTransaction(null);
+
+        assertDoesNotThrow(() -> txManager.begin(true));
+    }
+
+    @Test
+    void testGetFutureReadOnlyTransactions() {
+        // Let's check the absence of transactions.
+        assertThat(txManager.getFutureAllReadOnlyTransactions(clock.now()), willSucceedFast());
+
+        InternalTransaction rwTx0 = txManager.begin(false);
+
+        InternalTransaction roTx0 = txManager.begin(true);
+        InternalTransaction roTx1 = txManager.begin(true);
+
+        CompletableFuture<Void> readOnlyTxsFutures = txManager.getFutureAllReadOnlyTransactions(roTx1.readTimestamp());

Review Comment:
   ```suggestion
           CompletableFuture<Void> readOnlyTxsFuture = txManager.getFutureAllReadOnlyTransactions(roTx1.readTimestamp());
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) : new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, readOnlyTxFuture) -> {
+            assert readOnlyTxFuture == null : "previous transaction has not completed yet: " + readTimestamp;
+
+            HybridTimestamp lowerBound = lowerBoundTsToStartNewReadOnlyTx.get();
+
+            if (lowerBound != null && readTimestamp.compareTo(lowerBound) <= 0) {
+                throw new IgniteInternalException(
+                        TX_READ_ONLY_CREATING_ERR,
+                        "Timestamp read-only transaction must be greater than the lower bound: [txTimestamp={}, lowerBound={}]",
+                        readTimestamp, lowerBound
+                );

Review Comment:
   Consider the following scenario:
   
   1. One thread (A) starts an RO TX, it is inside `readOnlyTxFutureByReadTs.compute()`, it already took the old lower bound value and validated the read TS (T1) successfully, but did not return from the closure yet
   2. Another thread (B) updates the lower bound to a higher value, with which T1 is not valid anymore
   3. Then A finishes the addition of the new RO TX, which is too old
   
   Another scenario is that addition of a transaction with TS T1 begins in thread A, then thread B takes `getFutureAllReadOnlyTransactions()` for T2>T1 (and does not see that transaction for T1 yet), and then the addition of the transaction completes. The transaction might take longer than the transactions that were seen by `getFutureAllReadOnlyTransactions()`, so the result of `getFutureAllReadOnlyTransactions()` might complete before the transaction at T1 completes, which seems to break the contract.
   
   Probably we need a synchronization coordinating updates of both the future map and the lower bound (and also the reads of the map that are used to build the compound future).



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {

Review Comment:
   Our 'managers' are usually `IgniteComponent`s, not `ManuallyCloseable`s. Is there a specific reason to prefer the latter here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) {
+                                LOG.error("Error getting low watermark", throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+                            }
+                        } else {
+                            LOG.info(
+                                    "Low watermark has been successfully got from the vault and is scheduled to be updated: {}",
+                                    lowWatermark
+                            );
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ScheduledFuture<?> lastScheduledTaskFuture = this.lastScheduledTaskFuture.get();
+
+        if (lastScheduledTaskFuture != null) {
+            lastScheduledTaskFuture.cancel(true);
+        }
+
+        IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Returns the current low watermark, {@code null} means no low watermark has been assigned yet.
+     */
+    public @Nullable HybridTimestamp getLowWatermark() {
+        return lowWatermark.get();
+    }
+
+    void updateLowWatermark() {
+        inBusyLock(busyLock, () -> {
+            HybridTimestamp lowWatermarkCandidate = createNewLowWatermarkCandidate();
+
+            txManager.updateLowerBoundToStartNewReadOnlyTransaction(lowWatermarkCandidate);
+
+            txManager.getFutureAllReadOnlyTransactions(lowWatermarkCandidate)
+                    .thenCompose(unused -> inBusyLock(

Review Comment:
   How about adding a comment that now the candidate is being promoted as a new LWM? Otherwise, it's a bit puzzling why a candidate is written to the Vault as a new LWM.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkManager.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class to manage the low watermark.
+ */
+public class LowWatermarkManager implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkManager.class);
+
+    static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark");
+
+    private final LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock;
+
+    private final TxManager txManager;
+
+    private final VaultManager vaultManager;
+
+    private final MvGc mvGc;
+
+    private final ScheduledExecutorService scheduledThreadPool;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    private final AtomicReference<HybridTimestamp> lowWatermark = new AtomicReference<>();
+
+    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param lowWatermarkConfig Low watermark configuration.
+     * @param clock A hybrid logical clock.
+     * @param txManager Transaction manager.
+     * @param vaultManager Vault manager.
+     */
+    public LowWatermarkManager(
+            String nodeName,
+            LowWatermarkConfiguration lowWatermarkConfig,
+            HybridClock clock,
+            TxManager txManager,
+            VaultManager vaultManager,
+            MvGc mvGc
+    ) {
+        this.lowWatermarkConfig = lowWatermarkConfig;
+        this.clock = clock;
+        this.txManager = txManager;
+        this.vaultManager = vaultManager;
+        this.mvGc = mvGc;
+
+        scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG)
+        );
+    }
+
+    /**
+     * Starts the watermark manager.
+     */
+    public void start() {
+        inBusyLock(busyLock, () -> {
+            vaultManager.get(LOW_WATERMARK_VAULT_KEY)
+                    .thenApply(vaultEntry -> inBusyLock(busyLock, () -> {
+                        if (vaultEntry == null) {
+                            scheduleUpdateLowWatermarkBusy();
+
+                            return null;
+                        }
+
+                        HybridTimestamp lowWatermark = ByteUtils.fromBytes(vaultEntry.value());
+
+                        this.lowWatermark.set(lowWatermark);
+
+                        runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+
+                        return lowWatermark;
+                    }))
+                    .whenComplete((lowWatermark, throwable) -> {
+                        if (throwable != null) {
+                            if (!(throwable instanceof NodeStoppingException)) {
+                                LOG.error("Error getting low watermark", throwable);
+
+                                // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler

Review Comment:
   As we don't fail the node yet, should we try to schedule another attempt to update a watermark? It would be puzzling if something fails in tests during start, and then nothing gets updated.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater this lower bound.

Review Comment:
   ```suggestion
        * <p>All new read-only transactions will have to start with timestamps strictly later than this lower bound.
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) : new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, readOnlyTxFuture) -> {
+            assert readOnlyTxFuture == null : "previous transaction has not completed yet: " + readTimestamp;
+
+            HybridTimestamp lowerBound = lowerBoundTsToStartNewReadOnlyTx.get();
+
+            if (lowerBound != null && readTimestamp.compareTo(lowerBound) <= 0) {
+                throw new IgniteInternalException(
+                        TX_READ_ONLY_CREATING_ERR,

Review Comment:
   Do we need such a general error cause ('some error while creating an RO transaction'? Won't it make sense to reserve a specific error code for this specific error: 'RO read tx is too old'?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -124,4 +128,20 @@ CompletableFuture<Void> cleanup(
      */
     @TestOnly
     int finished();
+
+    /**
+     * Updates the lower bound (exclusive) of the timestamp to start new read-only transactions.
+     *
+     * <p>All new read-only transactions will have to start strictly greater this lower bound.
+     *
+     * @param lowerBound New lower bound, {@code null} if there is no lower bound.
+     */
+    void updateLowerBoundToStartNewReadOnlyTransaction(@Nullable HybridTimestamp lowerBound);

Review Comment:
   How about a different name, like `forbidReadOnlyTransactionsNotLaterThan()`? Just a stylistic matter.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkManagerTest.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.table.distributed.LowWatermarkManager.LOW_WATERMARK_VAULT_KEY;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InOrder;
+
+/**
+ * For {@link LowWatermarkManager} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class LowWatermarkManagerTest {
+    @InjectConfiguration
+    private LowWatermarkConfiguration lowWatermarkConfig;
+
+    private final HybridClock clock = spy(new HybridClockImpl());
+
+    private final TxManager txManager = mock(TxManager.class);
+
+    private final VaultManager vaultManager = mock(VaultManager.class);
+
+    private final MvGc mvGc = mock(MvGc.class);
+
+    private LowWatermarkManager lowWatermarkManager;
+
+    @BeforeEach
+    void setUp() {
+        lowWatermarkManager = new LowWatermarkManager("test", lowWatermarkConfig, clock, txManager, vaultManager, mvGc);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        lowWatermarkManager.close();
+    }
+
+    @Test
+    void testStart() {
+        // Let's check the start with no low watermark in vault.
+        when(vaultManager.get(LOW_WATERMARK_VAULT_KEY)).thenReturn(completedFuture(null));
+
+        lowWatermarkManager.start();
+
+        verify(mvGc, never()).updateLowWatermark(any(HybridTimestamp.class));
+        assertNull(lowWatermarkManager.getLowWatermark());
+
+        // Let's check the start with an existing low watermark in vault.
+        HybridTimestamp lowWatermark = new HybridTimestamp(10, 10);
+
+        when(vaultManager.get(LOW_WATERMARK_VAULT_KEY))
+                .thenReturn(completedFuture(new VaultEntry(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(lowWatermark))));
+
+        lowWatermarkManager.start();
+
+        verify(mvGc).updateLowWatermark(lowWatermark);
+        assertEquals(lowWatermark, lowWatermarkManager.getLowWatermark());
+    }
+
+    @Test
+    void testCreateNewLowWatermarkCandidate() {
+        when(clock.now()).thenReturn(new HybridTimestamp(1_000, 500));
+
+        assertThat(lowWatermarkConfig.dataAvailabilityTime().update(100L), willSucceedFast());
+
+        HybridTimestamp newLowWatermarkCandidate = lowWatermarkManager.createNewLowWatermarkCandidate();
+
+        assertThat(newLowWatermarkCandidate.getPhysical(), lessThanOrEqualTo(1_000L - 100));
+        assertEquals(500L, newLowWatermarkCandidate.getLogical());
+    }
+
+    @Test
+    void testUpdateLowWatermark() {
+        HybridTimestamp now = clock.now();
+
+        when(clock.now()).thenReturn(now);
+
+        when(txManager.getFutureAllReadOnlyTransactions(any(HybridTimestamp.class))).thenReturn(completedFuture(null));
+
+        when(vaultManager.put(any(ByteArray.class), any(byte[].class))).thenReturn(completedFuture(null));
+
+        // Made a predictable candidate to make it easier to test.

Review Comment:
   ```suggestion
           // Make a predictable candidate to make it easier to test.
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) : new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, readOnlyTxFuture) -> {

Review Comment:
   Probably this is not yet implemented, but I think it will be possible to the user to explicitly specify the read TS they want for an RO transaction. This probably  means that read TS should not be used to identify a transaction: a whole collection of TXs might correspond to a given read TS.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java:
##########
@@ -17,85 +17,84 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * The read-only implementation of an internal transaction.
  */
-public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
+class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
     /** The read timestamp. */
     private final HybridTimestamp readTimestamp;
 
+    /** Prevents double finish of the transaction. */
+    private final AtomicBoolean finishGuard = new AtomicBoolean();
+
     /**
      * The constructor.
      *
      * @param txManager The tx manager.
      * @param id The id.
      * @param readTimestamp The read timestamp.
      */
-    public ReadOnlyTransactionImpl(
-            TxManager txManager,
-            @NotNull UUID id,
-            HybridTimestamp readTimestamp
-    ) {
+    ReadOnlyTransactionImpl(TxManagerImpl txManager, UUID id, HybridTimestamp readTimestamp) {
         super(txManager, id);
+
         this.readTimestamp = readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean isReadOnly() {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public HybridTimestamp readTimestamp() {
         return readTimestamp;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
         // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean assignCommitPartition(ReplicationGroupId replicationGroupId) {
         return true;
     }
 
-    /** {@inheritDoc} */
     @Override
     public ReplicationGroupId commitPartition() {
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void enlistResultFuture(CompletableFuture<?> resultFuture) {
         // No-op.
     }
 
-    /** {@inheritDoc} */
     @Override
+    // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
     protected CompletableFuture<Void> finish(boolean commit) {
-        // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
-        return CompletableFuture.completedFuture(null);
+        if (!finishGuard.compareAndSet(false, true)) {
+            return completedFuture(null);
+        }
+
+        ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(readTimestamp);
+
+        return completedFuture(null);

Review Comment:
   Should we return the corresponding future from `TxManagerImpl` (that gets completed by `completeReadOnlyTransactionFuture()`, or it's unrelated?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -74,27 +86,45 @@ public TxManagerImpl(ReplicaService replicaService, LockManager lockManager, Hyb
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
         return begin(false);
     }
 
-    /** {@inheritDoc} */
     @Override
     public InternalTransaction begin(boolean readOnly) {
         UUID txId = Timestamp.nextVersion().toUuid();
 
-        return readOnly ? new ReadOnlyTransactionImpl(this, txId, clock.now()) : new ReadWriteTransactionImpl(this, txId);
+        if (!readOnly) {
+            return new ReadWriteTransactionImpl(this, txId);
+        }
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        readOnlyTxFutureByReadTs.compute(readTimestamp, (timestamp, readOnlyTxFuture) -> {
+            assert readOnlyTxFuture == null : "previous transaction has not completed yet: " + readTimestamp;

Review Comment:
   In the current implementation, each tx seems to get a distinct read timestamp, so why the check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org