You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/08/12 08:24:47 UTC

[GitHub] [ignite-3] rpuch commented on a diff in pull request #995: IGNITE-17475 FreeList metadata is not stored on the checkpoint

rpuch commented on code in PR #995:
URL: https://github.com/apache/ignite-3/pull/995#discussion_r944164632


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/AwaitTasksCompletionExecutor.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Executor {@link #awaitPendingTasksFinished() waiting for the completion} of tasks added via {@link #execute(Runnable)}.
+ *
+ * <p>Not thread-safe.
+ */
+class AwaitTasksCompletionExecutor implements Executor {
+    private final Executor executor;
+
+    private final Runnable updateHeartbeat;
+
+    private List<CompletableFuture<?>> pendingTaskFutures = new ArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param executor Executor in which the tasks will be performed.
+     * @param updateHeartbeat Update heartbeat callback that will be executed on completion of each task.
+     */
+    AwaitTasksCompletionExecutor(Executor executor, Runnable updateHeartbeat) {
+        this.executor = executor;
+        this.updateHeartbeat = updateHeartbeat;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void execute(Runnable command) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+
+        future.whenComplete((o, throwable) -> updateHeartbeat.run());
+
+        pendingTaskFutures.add(future);
+
+        executor.execute(() -> {
+            try {
+                command.run();
+
+                future.complete(null);
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+            }
+        });
+    }
+
+    /**
+     * Await all async tasks from executor was finished.

Review Comment:
   ```suggestion
        * Await all async tasks from executor have finished.
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -123,10 +123,12 @@ public CheckpointManager(
         checkpointMarkersStorage = new CheckpointMarkersStorage(storagePath);
 
         checkpointWorkflow = new CheckpointWorkflow(
+                Loggers.forClass(CheckpointWorkflow.class),

Review Comment:
   Why would not `CheckpointWorkflow` instantiate the logger itself? Now we have to have an additional constructor parameter, which clutters the construction code, but does not seem to give any benefit.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/AwaitTasksCompletionExecutor.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Executor {@link #awaitPendingTasksFinished() waiting for the completion} of tasks added via {@link #execute(Runnable)}.
+ *
+ * <p>Not thread-safe.
+ */
+class AwaitTasksCompletionExecutor implements Executor {
+    private final Executor executor;
+
+    private final Runnable updateHeartbeat;
+
+    private List<CompletableFuture<?>> pendingTaskFutures = new ArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param executor Executor in which the tasks will be performed.
+     * @param updateHeartbeat Update heartbeat callback that will be executed on completion of each task.
+     */
+    AwaitTasksCompletionExecutor(Executor executor, Runnable updateHeartbeat) {
+        this.executor = executor;
+        this.updateHeartbeat = updateHeartbeat;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void execute(Runnable command) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+
+        future.whenComplete((o, throwable) -> updateHeartbeat.run());
+
+        pendingTaskFutures.add(future);

Review Comment:
   If the awaiting method is never called, this list grows indefinitely. This is probably ok if the user knows that only a limited amount of tasks will be submitted. Should we mention this in the javadoc?
   
   As the class is package-local, this is just a suggestion.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java:
##########
@@ -35,6 +36,19 @@
  * Implementation of {@link DataRegion} for persistent case.
  */
 class PersistentPageMemoryDataRegion implements DataRegion<PersistentPageMemory> {
+    /**
+     * Threshold to calculate limit for pages list on-heap caches.
+     *
+     * <p>Note: When a checkpoint is triggered, we need some amount of page memory to store pages list on-heap cache.
+     * If a checkpoint is triggered by "too many dirty pages" reason and pages list cache is rather big, we can get {@code
+     * IgniteOutOfMemoryException}. To prevent this, we can limit the total amount of cached page list buckets, assuming that checkpoint
+     * will be triggered if no more then 3/4 of pages will be marked as dirty (there will be at least 1/4 of clean pages) and each cached
+     * page list bucket can be stored to up to 2 pages (this value is not static, but depends on PagesCache.MAX_SIZE, so if
+     * PagesCache.MAX_SIZE > PagesListNodeIo#getCapacity it can take more than 2 pages). Also some amount of page memory needed to store

Review Comment:
   ```suggestion
        * PagesCache.MAX_SIZE > PagesListNodeIo#getCapacity it can take more than 2 pages). Also some amount of page memory is needed to store
   ```



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java:
##########
@@ -492,6 +526,81 @@ void testParallelSortDirtyPages() throws Exception {
         );
     }
 
+    @Test
+    void testAwaitPendingTasksOfListenerCallback() {
+        workflow = new CheckpointWorkflow(
+                log,
+                "test",
+                mock(CheckpointMarkersStorage.class),
+                newReadWriteLock(log),
+                List.of(),
+                2
+        );
+
+        workflow.start();
+
+        CompletableFuture<?> startTask0Future = new CompletableFuture<>();
+        CompletableFuture<?> finishTask0Future = new CompletableFuture<>();

Review Comment:
   I suggest replacing 0 and 1 with something giving more hint to the reader, like 'beforeCheckoint' and 'onMarkChecpoint'



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java:
##########
@@ -492,6 +526,81 @@ void testParallelSortDirtyPages() throws Exception {
         );
     }
 
+    @Test
+    void testAwaitPendingTasksOfListenerCallback() {
+        workflow = new CheckpointWorkflow(
+                log,
+                "test",
+                mock(CheckpointMarkersStorage.class),
+                newReadWriteLock(log),
+                List.of(),
+                2
+        );
+
+        workflow.start();
+
+        CompletableFuture<?> startTask0Future = new CompletableFuture<>();
+        CompletableFuture<?> finishTask0Future = new CompletableFuture<>();
+
+        CompletableFuture<?> startTask1Future = new CompletableFuture<>();
+        CompletableFuture<?> finishTask1Future = new CompletableFuture<>();
+
+        workflow.addCheckpointListener(new CheckpointListener() {
+            /** {@inheritDoc} */
+            @Override
+            public void beforeCheckpointBegin(CheckpointProgress progress, @Nullable Executor executor) {
+                assertNotNull(executor);
+
+                executor.execute(() -> {
+                    startTask0Future.complete(null);
+
+                    await(finishTask0Future, 1, SECONDS);

Review Comment:
   How about increasing this to 10 seconds? On TC, it seems that, if an unfortunate GC happens, 1 second might be too short. 10 seconds are harder to reach, but, if test logic fails, the test will take approx 10 seconds which is still acceptable for a failing test.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataRegion.java:
##########
@@ -35,6 +36,19 @@
  * Implementation of {@link DataRegion} for persistent case.
  */
 class PersistentPageMemoryDataRegion implements DataRegion<PersistentPageMemory> {
+    /**
+     * Threshold to calculate limit for pages list on-heap caches.

Review Comment:
   It is not clear which ratio it limited by this. Can this be reformulated as 'maximum fraction of all pages that can be taken by page list caches'?
   
   Also, why does it say about on-heap caches, while page memory is off-heap? Does it mean that pages list caches are always stored on-heap, so these 10% are not taken from the page-memory pages, but they are added on top of them, so memory-wise, this means up to 110% of the memory configured for page-memory?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java:
##########
@@ -17,15 +17,20 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import static org.apache.ignite.internal.pagememory.persistence.checkpoint.Checkpointer.CHECKPOINT_RUNNER_THREAD_PREFIX;
-
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.lang.IgniteInternalException;
 
 /**
  * Wrapper of the classic read write lock with checkpoint features.
  */
 public class CheckpointReadWriteLock {
+    /**
+     * Any thread with a such prefix is managed by the checkpoint.
+     *
+     * <p>So some conditions can rely on it(ex. we don't need a checkpoint lock there because checkpoint is already held write lock).

Review Comment:
   Part after 'because' seems to be broken. What is the meaning? 'because checkpoint already holds write lock', or 'because checkpoint write lock is already held'? Or something different?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java:
##########
@@ -29,9 +31,10 @@ public interface CheckpointListener {
      * <p>Holds checkpoint write lock.
      *
      * @param progress Progress of the current checkpoint.
+     * @param executor Executor for asynchronously executing the callback.
      * @throws IgniteInternalCheckedException If failed.
      */
-    default void onMarkCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+    default void onMarkCheckpointBegin(CheckpointProgress progress, @Nullable Executor executor) throws IgniteInternalCheckedException {

Review Comment:
   Is it good to pass `null` here? Usually, no executor means 'execute on this thread', we could create a trivial `SameThreadExecutor` and use its cached static instance everywhere instead of passing `null`. Avoiding nulls is always good, and it will not cost us anything in the terms of performance, but will make the code even a bit more readable (as the reader will immediately understand that the thread is executed on the same thread without the necessity to guess).



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java:
##########
@@ -29,9 +31,10 @@ public interface CheckpointListener {
      * <p>Holds checkpoint write lock.
      *
      * @param progress Progress of the current checkpoint.
+     * @param executor Executor for asynchronously executing the callback.

Review Comment:
   Should we mention in the javadoc how the task will be executed if `null` is passed?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java:
##########
@@ -51,9 +54,10 @@ default void onCheckpointBegin(CheckpointProgress progress) throws IgniteInterna
      * <p>Holds checkpoint read lock.
      *
      * @param progress Progress of the current checkpoint.
+     * @param executor Executor for asynchronously executing the callback.

Review Comment:
   Same thing about the `null` case



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -140,21 +173,32 @@ public void stop() {
      * @param startCheckpointTimestamp Checkpoint start timestamp.
      * @param curr Current checkpoint event info.
      * @param tracker Checkpoint metrics tracker.
+     * @param updateHeartbeat Update heartbeat callback.
+     * @param onReleaseWriteLock Callback on write lock release.
      * @return Checkpoint collected info.
      * @throws IgniteInternalCheckedException If failed.
      */
     public Checkpoint markCheckpointBegin(
             long startCheckpointTimestamp,
             CheckpointProgressImpl curr,
-            CheckpointMetricsTracker tracker
+            CheckpointMetricsTracker tracker,
+            Runnable updateHeartbeat,
+            Runnable onReleaseWriteLock
     ) throws IgniteInternalCheckedException {
         List<CheckpointListener> listeners = collectCheckpointListeners(dataRegions);
 
         checkpointReadWriteLock.readLock();
 
+        AwaitTasksCompletionExecutor executor = callbackListenerThreadPool == null
+                ? null : new AwaitTasksCompletionExecutor(callbackListenerThreadPool, updateHeartbeat);

Review Comment:
   It seems that if  there is no pool (i.e. there is just one thread), heartbeats are not invoked. Why is this ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org