You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "rpuch (via GitHub)" <gi...@apache.org> on 2023/06/21 08:47:14 UTC

[GitHub] [ignite-3] rpuch opened a new pull request, #2228: IGNITE-19209 Implement installing table schema updates

rpuch opened a new pull request, #2228:
URL: https://github.com/apache/ignite-3/pull/2228

   https://issues.apache.org/jira/browse/IGNITE-19209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239507405


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   But don't you already track these futures? They will be cancelled when the tracker gets closed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239690070


##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java:
##########
@@ -81,6 +76,25 @@ public interface MetaStorageManager extends IgniteComponent {
     @Deprecated
     List<Entry> getLocally(byte[] key, long revLowerBound, long revUpperBound);
 
+    /**
+     * Returns an entry by the given key and bounded by the given revision. The entry is obtained
+     * from the local storage.
+     *
+     * @param key The key.
+     * @param revUpperBound The upper bound of revision.
+     * @return Value corresponding to the given key.
+     */
+    Entry getLocally(byte[] key, long revUpperBound);

Review Comment:
   A similar method is deprecated and has a TODO, is it applicable here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238396316


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -130,24 +132,32 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
 
-    private final HybridClock clock;
+    private final ClockWaiter clockWaiter;
 
-    private final long delayDurationMs;
+    private final Lazy<Long> delayDurationMs;

Review Comment:
   1. This setting must not be changed since it's configured during cluster init (at least, for now; later we might make it changeable)
   2. I wanted to avoid going to the configuration on each schema update, but it's actually not a problem as schema updates do not happen too often. So I removed caching of the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238566574


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   `IgniteUtils#shutdownAndAwaitTermination()` does `shutdown()` first and then it waits for the executor termination. This does not work well for scheduled executors as they will wait for scheduled (but not-yet-executing) tasks to execute. We don't need this here: we just don't need any executions at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239383598


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);

Review Comment:
   Since the clock comes from the "outside world", it can still be used after this component has been stopped, therefore this method can be triggered. Is that ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238408528


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)

Review Comment:
   We have to wait for versionTracker because this is how we make sure that the version we just created is available in the CatalogServiceImpl internal mappings; and we need this to obtain the new version activation moment. Only when we know the activation moment can we compute the target moment we need to wait for.
   
   The necessity to wait for `versionTracker` to get activation moment will become obsolete when https://issues.apache.org/jira/browse/IGNITE-19796 gets implemented.
   
   Thanks, I applied your suggestion with a minor modification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239477856


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)

Review Comment:
   Oh I missed that this wait already happens. Thanks, removed the wait



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238572022


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);

Review Comment:
   The busy lock does not allow the clients of this class to submit any work to it. Here, it's handling of some work that was submitted earlier. It doesn't seem problematic to allow some of this work be finished. Or there is some problem here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239376159


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)

Review Comment:
   I understand why we need to wait for the `versionTracker`, my question was why we do it here since we also wait for the `versionTracker` inside `saveUpdate`, so now it happens twice. Or am I missing something?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -130,24 +132,32 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
 
-    private final HybridClock clock;
+    private final ClockWaiter clockWaiter;
 
-    private final long delayDurationMs;
+    private final Lazy<Long> delayDurationMs;
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) {
-        this(updateLog, clock, DEFAULT_DELAY_DURATION);
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
+        this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
     }
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock, long delayDurationMs) {
+    CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) {
+        this(updateLog, clockWaiter, () -> delayDurationMs);
+    }
+
+    /**
+     * Constructor.
+     */
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, LongSupplier delayDurationMsSupplier) {

Review Comment:
   I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239692137


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   Please leave a comment about this, so that it does not get refactored by someone in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238456915


##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java:
##########
@@ -243,4 +243,11 @@ public HybridTimestamp addPhysicalTime(long mills) {
 
         return new HybridTimestamp(time + (mills << LOGICAL_TIME_BITS_SIZE));
     }
+
+    /**
+     * Returns max clock skew for the cluster (in millis).
+     */
+    public static long maxClockSkew() {
+        return CLOCK_SKEW;

Review Comment:
   I was planning to do so, but there was some obstacle, so I decided to postpone it. I filed a ticket about this and mentioned it in a TODO: https://issues.apache.org/jira/browse/IGNITE-19809



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239382486


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);
+    }
+
+    /**
+     * Wait for the clock to reach the given timestamp.
+     *
+     * @param targetTimestamp Timestamp to wait for.
+     * @return A future that completes when the timestamp is reached by the clock's time.
+     */
+    public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doWaitFor(targetTimestamp);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) {
+        CompletableFuture<Void> future = nowTracker.waitFor(targetTimestamp);
+
+        AtomicReference<ScheduledFuture<?>> scheduledFutureRef = new AtomicReference<>();
+
+        if (!future.isDone()) {
+            // This triggers a clock update.
+            HybridTimestamp now = clock.now();
+
+            if (targetTimestamp.compareTo(now) <= 0) {
+                assert future.isDone();
+            } else {
+                // Adding 1 to account for a possible non-null logical part of the targetTimestamp.
+                long millisToWait = targetTimestamp.getPhysical() - now.getPhysical() + 1;
+
+                ScheduledFuture<?> scheduledFuture = scheduler.schedule(this::triggerClockUpdate, millisToWait, TimeUnit.MILLISECONDS);
+                scheduledFutureRef.set(scheduledFuture);
+            }
+        }
+
+        return future.handle((res, ex) -> {
+            ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(true);
+            }
+
+            if (ex != null) {

Review Comment:
   Makes sense, just leave a comment about this, it looks a little bit confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239687906


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   oops, that's correct, ok then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239694109


##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java:
##########
@@ -81,6 +76,25 @@ public interface MetaStorageManager extends IgniteComponent {
     @Deprecated
     List<Entry> getLocally(byte[] key, long revLowerBound, long revUpperBound);
 
+    /**
+     * Returns an entry by the given key and bounded by the given revision. The entry is obtained
+     * from the local storage.
+     *
+     * @param key The key.
+     * @param revUpperBound The upper bound of revision.
+     * @return Value corresponding to the given key.
+     */
+    Entry getLocally(byte[] key, long revUpperBound);

Review Comment:
   Discussed in a private chat, we agreed to leave the method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238560776


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);
+    }
+
+    /**
+     * Wait for the clock to reach the given timestamp.
+     *
+     * @param targetTimestamp Timestamp to wait for.
+     * @return A future that completes when the timestamp is reached by the clock's time.
+     */
+    public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doWaitFor(targetTimestamp);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) {
+        CompletableFuture<Void> future = nowTracker.waitFor(targetTimestamp);
+
+        AtomicReference<ScheduledFuture<?>> scheduledFutureRef = new AtomicReference<>();

Review Comment:
   We need a container for a `ScheduledFuture` because it's going to be used in a lambda, so it must be effectively final; without a container it would not compile.
   
   Also, we need the container to be thread-safe (as the future completion might be triggered from another thread), hence `AtomicReference` seems to be an ideal choice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238194079


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -130,24 +132,32 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
 
-    private final HybridClock clock;
+    private final ClockWaiter clockWaiter;
 
-    private final long delayDurationMs;
+    private final Lazy<Long> delayDurationMs;
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) {
-        this(updateLog, clock, DEFAULT_DELAY_DURATION);
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
+        this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
     }
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock, long delayDurationMs) {
+    CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) {
+        this(updateLog, clockWaiter, () -> delayDurationMs);
+    }
+
+    /**
+     * Constructor.
+     */
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, LongSupplier delayDurationMsSupplier) {

Review Comment:
   Actually, using `Supplier<Long>` makes more sense here, less boxing/unboxing



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)
+                        .thenCompose(unused -> {
+                            Catalog catalog = catalogByVer.get(newVersion);
+                            HybridTimestamp activationTs = HybridTimestamp.hybridTimestamp(catalog.time());
+                            HybridTimestamp clusterWideEnsuredActivationTs = activationTs.addPhysicalTime(
+                                    HybridTimestamp.maxClockSkew()

Review Comment:
   Why do we use only the Clock Skew here? IEP also talks about propagation times...



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java:
##########
@@ -56,9 +56,9 @@ public int version() {
         return version;
     }
 
-    /** Returns activation timestamp. */
-    public long activationTimestamp() {
-        return activationTimestamp;
+    /** Returns Delay Duration for this update (in milliseconds). */
+    public long delayDuration() {

Review Comment:
   Can we keep the `Ms` suffix as a part of this method's name?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -209,13 +214,14 @@ public CompletableFuture<Void> onUpdate(WatchEvent event) {
 
                 VersionedUpdate update = fromBytes(payload);
 
-                onUpdateHandler.handle(update);
+                HybridTimestamp entryTimestamp = metastoreRevisionToTs.apply(eventEntry.newEntry().revision());

Review Comment:
   Can't we include the update timestamp as a part of the `WatchEvent`? This looks like an unnecessary lookup...



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)

Review Comment:
   Why do we need to call `versionTracker.waitFor` if we do the same thing in `saveUpdate`?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);
+    }
+
+    /**
+     * Wait for the clock to reach the given timestamp.
+     *
+     * @param targetTimestamp Timestamp to wait for.
+     * @return A future that completes when the timestamp is reached by the clock's time.
+     */
+    public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doWaitFor(targetTimestamp);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) {
+        CompletableFuture<Void> future = nowTracker.waitFor(targetTimestamp);
+
+        AtomicReference<ScheduledFuture<?>> scheduledFutureRef = new AtomicReference<>();

Review Comment:
   Why do you need an atomic reference here?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java:
##########
@@ -40,12 +40,12 @@ public class VersionedUpdate implements Serializable {
      * Constructs the object.
      *
      * @param version A version the changes relate to.
-     * @param activationTimestamp Timestamp given changes become active at.
+     * @param delayDurationMs Delay duration that, when added to the entry save timestamp, will produce activation timestamp (milliseconds).

Review Comment:
   ```suggestion
        * @param delayDurationMs Delay duration that, when added to the entry save timestamp, will produce the activation timestamp (milliseconds).
   ```



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java:
##########
@@ -40,12 +40,12 @@ public class VersionedUpdate implements Serializable {
      * Constructs the object.
      *
      * @param version A version the changes relate to.
-     * @param activationTimestamp Timestamp given changes become active at.
+     * @param delayDurationMs Delay duration that, when added to the entry save timestamp, will produce activation timestamp (milliseconds).

Review Comment:
   `entry save timestamp` doesn't sound right, can you please rephrase it?



##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java:
##########
@@ -67,11 +71,18 @@ public long nowLong() {
             long newLatestTime = max(oldLatestTime + 1, now);
 
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) {
+                HybridTimestamp newTs = hybridTimestamp(newLatestTime);

Review Comment:
   Should we optimize the `now` method? We will be creating excessive `HybridTimestamp` objects otherwise



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)
+                        .thenCompose(unused -> {
+                            Catalog catalog = catalogByVer.get(newVersion);
+                            HybridTimestamp activationTs = HybridTimestamp.hybridTimestamp(catalog.time());
+                            HybridTimestamp clusterWideEnsuredActivationTs = activationTs.addPhysicalTime(
+                                    HybridTimestamp.maxClockSkew()
+                            );
+
+                            return clockWaiter.waitFor(clusterWideEnsuredActivationTs);
+                        }));
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer, int attemptNo) {
+    private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int attemptNo) {

Review Comment:
   I would suggest to add a javadoc to this method



##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java:
##########
@@ -38,6 +41,8 @@ public class TestHybridClock implements HybridClock {
     /** Latest time. */
     private volatile long latestTime;
 
+    private final List<ClockUpdateListener> updateListeners = new CopyOnWriteArrayList<>();

Review Comment:
   Is it ok that these listeners do not get notified about updates?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -130,24 +132,32 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
 
-    private final HybridClock clock;
+    private final ClockWaiter clockWaiter;
 
-    private final long delayDurationMs;
+    private final Lazy<Long> delayDurationMs;

Review Comment:
   Why do we use `Lazy` and not simply cache the supplier? Shouldn't we be able to change the configuration on-the-fly? 



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -812,9 +830,9 @@ private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer, int at
 
     class OnUpdateHandlerImpl implements OnUpdateHandler {
         @Override
-        public void handle(VersionedUpdate update) {
+        public void handle(VersionedUpdate update, HybridTimestamp metastoreTimestamp) {

Review Comment:
   `metastoreTimestamp` is a bit misleading name, maybe `metastoreUpdateTimestamp` or `updateTimestamp` is better?



##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java:
##########
@@ -145,7 +146,7 @@ public static <T> CompletableFutureMatcher<T> willBe(Matcher<T> matcher) {
      * @param <T> value type
      * @return matcher
      */
-    public static <T> CompletableFutureMatcher<T> willBe(T value) {
+    public static <T> CompletableFutureMatcher<T> willBe(@Nullable T value) {

Review Comment:
   Why is this change needed? I would expect nulls to be matched as `willBe(nullValue())`



##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java:
##########
@@ -243,4 +243,11 @@ public HybridTimestamp addPhysicalTime(long mills) {
 
         return new HybridTimestamp(time + (mills << LOGICAL_TIME_BITS_SIZE));
     }
+
+    /**
+     * Returns max clock skew for the cluster (in millis).
+     */
+    public static long maxClockSkew() {
+        return CLOCK_SKEW;

Review Comment:
   Why isn't this a cluster-wide property?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);
+    }
+
+    /**
+     * Wait for the clock to reach the given timestamp.
+     *
+     * @param targetTimestamp Timestamp to wait for.
+     * @return A future that completes when the timestamp is reached by the clock's time.
+     */
+    public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doWaitFor(targetTimestamp);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) {
+        CompletableFuture<Void> future = nowTracker.waitFor(targetTimestamp);
+
+        AtomicReference<ScheduledFuture<?>> scheduledFutureRef = new AtomicReference<>();
+
+        if (!future.isDone()) {
+            // This triggers a clock update.
+            HybridTimestamp now = clock.now();
+
+            if (targetTimestamp.compareTo(now) <= 0) {
+                assert future.isDone();
+            } else {
+                // Adding 1 to account for a possible non-null logical part of the targetTimestamp.
+                long millisToWait = targetTimestamp.getPhysical() - now.getPhysical() + 1;
+
+                ScheduledFuture<?> scheduledFuture = scheduler.schedule(this::triggerClockUpdate, millisToWait, TimeUnit.MILLISECONDS);
+                scheduledFutureRef.set(scheduledFuture);
+            }
+        }
+
+        return future.handle((res, ex) -> {
+            ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(true);
+            }
+
+            if (ex != null) {

Review Comment:
   Why do you need this exception handling?



##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java:
##########
@@ -1571,6 +1585,55 @@ public void testColumnEvents() {
         verifyNoMoreInteractions(eventListener);
     }
 
+
+    @Test
+    public void userFutureCompletesAfterClusterWideActivationHappens() throws Exception {
+        final long delayDuration = TimeUnit.DAYS.toMillis(365);
+
+        HybridTimestamp startTs = clock.now();
+
+        InMemoryVaultService vaultService = new InMemoryVaultService();

Review Comment:
   This code looks weird, why do we have set up code in the test? 



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -165,9 +173,13 @@ private void restoreStateFromVault(OnUpdateHandler handler) {
                 break;
             }
 
+            if (appliedRevTimestamp == null) {
+                appliedRevTimestamp = metastoreRevisionToTs.apply(metastore.appliedRevision());

Review Comment:
   Can we add an `appliedRevisionTimestamp` method to the Meta Storage interface? Looks useful



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   Can we use the method from `IgniteUtils`?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);

Review Comment:
   Should it be wrapped in the busy lock?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)

Review Comment:
   Why not `HybridTimestamp.MIN_VALUE`?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)

Review Comment:
   If this part is still needed, I would propose to reorganize the code a little bit:
   ```
   return saveUpdate(updateProducer, 0)
           .thenCompose(newVersion -> versionTracker.waitFor(newVersion).thenApply(v -> catalogByVer.get(newVersion)))
           .thenCompose(catalog -> {
               HybridTimestamp activationTs = HybridTimestamp.hybridTimestamp(catalog.time());
               HybridTimestamp clusterWideEnsuredActivationTs = activationTs.addPhysicalTime(HybridTimestamp.maxClockSkew());
   
               return clockWaiter.waitFor(clusterWideEnsuredActivationTs);
           });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239677550


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   No, I'm tracking user-faced futures in the tracker. `ScheduledFuture`s are not tracked explicitly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239374905


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   It works fine if you cancel all `ScheduledFuture`s explicitly, which you should be doing anyway, I believe,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239388840


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java:
##########
@@ -67,7 +68,8 @@ interface OnUpdateHandler {
          * An actual handler that will be invoked when new update is appended to the log.
          *
          * @param update A new update.
+         * @param metastoreUpdateTimestamp Timestamp assigned to the update by the metastore.

Review Comment:
   I'm pretty sure we always use either `Meta Storage` or `Metastorage` when talking about the Meta Storage, let's not add new terms



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -189,14 +193,7 @@ static ByteArray updatePrefix() {
         }
     }
 
-    private static class UpdateListener implements WatchListener {

Review Comment:
   Why did you make this class non-static?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -165,9 +165,13 @@ private void restoreStateFromVault(OnUpdateHandler handler) {
                 break;
             }
 
+            if (appliedRevTimestamp == null) {
+                appliedRevTimestamp = metastore.appliedRevisionTimestamp();
+            }
+
             VersionedUpdate update = fromBytes(entry.value());
 
-            handler.handle(update);
+            handler.handle(update, appliedRevTimestamp);

Review Comment:
   Is this logic correct? We are kind of replaying Catalog updates from the Vault (by iterating over Catalog versions), is it ok that they will all have the same timestamp?



##########
modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java:
##########
@@ -33,24 +35,30 @@ public class WatchEvent {
 
     private final long revision;
 
+    /** Timestamp assigned by the MetaStorage to the event's revision. */
+    private final HybridTimestamp timestamp;
+
     /**
      * Constructs an watch event with given entry events collection.
      *
      * @param entryEvts Events for entries corresponding to an update under one revision.
      * @param revision Revision of the updated entries.
+     * @param timestamp Timestamp assigned by the MetaStorage to the event's revision.
      */
-    public WatchEvent(Collection<EntryEvent> entryEvts, long revision) {
+    public WatchEvent(Collection<EntryEvent> entryEvts, long revision, HybridTimestamp timestamp) {
         this.entryEvts = List.copyOf(entryEvts);
         this.revision = revision;
+        this.timestamp = timestamp;
     }
 
     /**
      * Constructs watch event with single entry update.
      *
      * @param entryEvt Entry event.
      */
+    @TestOnly
     public WatchEvent(EntryEvent entryEvt) {

Review Comment:
   Let's create a ticket to remove this constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill merged PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238416697


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)
+                        .thenCompose(unused -> {
+                            Catalog catalog = catalogByVer.get(newVersion);
+                            HybridTimestamp activationTs = HybridTimestamp.hybridTimestamp(catalog.time());
+                            HybridTimestamp clusterWideEnsuredActivationTs = activationTs.addPhysicalTime(
+                                    HybridTimestamp.maxClockSkew()

Review Comment:
   The IEP says the following:
   
   > Before an update is reported as installed to the client, an additional wait happens to make sure that the client’s request on any node will see the installed update after the installation call returns. The required wait is until the schema update activation becomes non-future on all nodes (taking clock skew into account, so the client must wait for at least Now ≥ Tu+CSmax).
   
   Here, Tu is activation moment of the schema update and CSMax is max clock skew, so the code reflects what IEP prescribes.
   
   Actual propagation time might be anything, we cannot bound it, but it's not a problem. When the client (after issuing a CREATE TABLE) sends a request to any cluster node, the HLC at the node is guaranteed (by the boundedness of CSMax) to be ahead of the activation moment; the node will take HLC.now as the client's operation ts T, and then it will wait till it gets all the schema updates required to process operation at timestamp T, so HERE we'll account for a possible MetaStorage lag of the node, not when installing the update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238562976


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);
+    }
+
+    /**
+     * Wait for the clock to reach the given timestamp.
+     *
+     * @param targetTimestamp Timestamp to wait for.
+     * @return A future that completes when the timestamp is reached by the clock's time.
+     */
+    public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doWaitFor(targetTimestamp);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) {
+        CompletableFuture<Void> future = nowTracker.waitFor(targetTimestamp);
+
+        AtomicReference<ScheduledFuture<?>> scheduledFutureRef = new AtomicReference<>();
+
+        if (!future.isDone()) {
+            // This triggers a clock update.
+            HybridTimestamp now = clock.now();
+
+            if (targetTimestamp.compareTo(now) <= 0) {
+                assert future.isDone();
+            } else {
+                // Adding 1 to account for a possible non-null logical part of the targetTimestamp.
+                long millisToWait = targetTimestamp.getPhysical() - now.getPhysical() + 1;
+
+                ScheduledFuture<?> scheduledFuture = scheduler.schedule(this::triggerClockUpdate, millisToWait, TimeUnit.MILLISECONDS);
+                scheduledFutureRef.set(scheduledFuture);
+            }
+        }
+
+        return future.handle((res, ex) -> {
+            ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
+
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(true);
+            }
+
+            if (ex != null) {

Review Comment:
   I just did not want the downstream code to see the `TrackerClosedException`; the future gets cancelled, isn't it? Do you believe it's better to retain the original exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239381926


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(HybridTimestamp newTs) {
+        nowTracker.update(newTs, null);
+    }
+
+    /**
+     * Wait for the clock to reach the given timestamp.
+     *
+     * @param targetTimestamp Timestamp to wait for.
+     * @return A future that completes when the timestamp is reached by the clock's time.
+     */
+    public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doWaitFor(targetTimestamp);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) {
+        CompletableFuture<Void> future = nowTracker.waitFor(targetTimestamp);
+
+        AtomicReference<ScheduledFuture<?>> scheduledFutureRef = new AtomicReference<>();

Review Comment:
   > it's going to be used in a lambda, so it must be effectively final; without a container it would not compile.
   
   It is possible to make `scheduledFuture` effectively final without using a container, you just need to assign `null`s explicitly
   
   > Also, we need the container to be thread-safe (as the future completion might be triggered from another thread), hence AtomicReference seems to be an ideal choice.
   
   This is a local effectively final variable, is it even possible to have races here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239476791


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -189,14 +193,7 @@ static ByteArray updatePrefix() {
         }
     }
 
-    private static class UpdateListener implements WatchListener {

Review Comment:
   To allow it see the fields of the enclosing class (and avoid creation of same fields in it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239677897


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -165,9 +165,13 @@ private void restoreStateFromVault(OnUpdateHandler handler) {
                 break;
             }
 
+            if (appliedRevTimestamp == null) {
+                appliedRevTimestamp = metastore.appliedRevisionTimestamp();
+            }
+
             VersionedUpdate update = fromBytes(entry.value());
 
-            handler.handle(update);
+            handler.handle(update, appliedRevTimestamp);

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238396993


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -130,24 +132,32 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0);
 
-    private final HybridClock clock;
+    private final ClockWaiter clockWaiter;
 
-    private final long delayDurationMs;
+    private final Lazy<Long> delayDurationMs;
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) {
-        this(updateLog, clock, DEFAULT_DELAY_DURATION);
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
+        this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
     }
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock, long delayDurationMs) {
+    CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) {
+        this(updateLog, clockWaiter, () -> delayDurationMs);
+    }
+
+    /**
+     * Constructor.
+     */
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, LongSupplier delayDurationMsSupplier) {

Review Comment:
   After `Lazy` has been removed, it became more naturally to use `LongSupplier` (as `Supplier<Long>` is not used anymore)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238459124


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -209,13 +214,14 @@ public CompletableFuture<Void> onUpdate(WatchEvent event) {
 
                 VersionedUpdate update = fromBytes(payload);
 
-                onUpdateHandler.handle(update);
+                HybridTimestamp entryTimestamp = metastoreRevisionToTs.apply(eventEntry.newEntry().revision());

Review Comment:
   If I'm not mistaken, to get a ts we MUST make a lookup (as it's stored in an index by revision). If we move it to `WatchEvent`, we'll have to make this lookup for 100% of entries; here we do it on-demand, only when required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238464931


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -209,13 +214,14 @@ public CompletableFuture<Void> onUpdate(WatchEvent event) {
 
                 VersionedUpdate update = fromBytes(payload);
 
-                onUpdateHandler.handle(update);
+                HybridTimestamp entryTimestamp = metastoreRevisionToTs.apply(eventEntry.newEntry().revision());

Review Comment:
   Actually, all events in a single notification will have the same revision, so it's kind of pointless to query it for all entries.
   
   Also, why do we have to do a lookup at all? We have a timestamp as a part of every Raft command, so why can't Meta Storage include it in its notifications?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1238642247


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -209,13 +214,14 @@ public CompletableFuture<Void> onUpdate(WatchEvent event) {
 
                 VersionedUpdate update = fromBytes(payload);
 
-                onUpdateHandler.handle(update);
+                HybridTimestamp entryTimestamp = metastoreRevisionToTs.apply(eventEntry.newEntry().revision());

Review Comment:
   It turned out that timestamp is already available on all non-test code paths that create instances of `WatchEvent`, so I added the timestamp to `WatchEvent`. Simplifies matters a lot, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239385621


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java:
##########
@@ -773,11 +783,21 @@ private void registerCatalog(Catalog newCatalog) {
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> versionTracker.waitFor(newVersion)
+                        .thenCompose(unused -> {
+                            Catalog catalog = catalogByVer.get(newVersion);
+                            HybridTimestamp activationTs = HybridTimestamp.hybridTimestamp(catalog.time());
+                            HybridTimestamp clusterWideEnsuredActivationTs = activationTs.addPhysicalTime(
+                                    HybridTimestamp.maxClockSkew()

Review Comment:
   ok, I missed that, makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239505728


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -189,14 +193,7 @@ static ByteArray updatePrefix() {
         }
     }
 
-    private static class UpdateListener implements WatchListener {

Review Comment:
   It only uses a single fields, I think the previous code was perfectly fine



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -189,14 +193,7 @@ static ByteArray updatePrefix() {
         }
     }
 
-    private static class UpdateListener implements WatchListener {

Review Comment:
   It only uses a single field, I think the previous code was perfectly fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2228: IGNITE-19209 Implement installing table schema updates

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2228:
URL: https://github.com/apache/ignite-3/pull/2228#discussion_r1239469773


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> nowTracker = new PendingComparableValuesTracker<>(
+            new HybridTimestamp(1, 0)
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        scheduler.shutdownNow();

Review Comment:
   That's the point: I don't want to track all the `ScheduledFuture`s if there is a simpler solution: just close the scheduler. The client-faced futures will be cancelled by a closure of the tracker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org