You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/05 05:37:18 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

tkalkirill opened a new pull request, #800:
URL: https://github.com/apache/ignite-3/pull/800

   https://issues.apache.org/jira/browse/IGNITE-16898


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870207963


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+class CheckpointProgressImpl implements CheckpointProgress {
+    /** Checkpoint id. */
+    private final UUID id = UUID.randomUUID();
+
+    /** Scheduled time of checkpoint. */
+    private volatile long nextCheckpointNanos;
+
+    /** Current checkpoint state. */
+    private volatile AtomicReference<CheckpointState> state = new AtomicReference<>(SCHEDULED);
+
+    /** Future which would be finished when corresponds state is set. */
+    private final Map<CheckpointState, CompletableFuture<Void>> stateFutures = new ConcurrentHashMap<>();
+
+    /** Wakeup reason. */
+    private volatile String reason;
+
+    /** Number of dirty pages in current checkpoint at the beginning of checkpoint. */
+    private volatile int currCheckpointPagesCnt;
+
+    /** Cause of fail, which has happened during the checkpoint or {@code null} if checkpoint was successful. */
+    @Nullable
+    private volatile Throwable failCause;
+
+    /** Counter for written checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger writtenPagesCntr;
+
+    /** Counter for fsynced checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger syncedPagesCntr;
+
+    /** Counter for evicted checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger evictedPagesCntr;
+
+    /**
+     * Constructor.
+     *
+     * @param nextCheckpointTimeout Timeout until next checkpoint in nanos.
+     */
+    CheckpointProgressImpl(long nextCheckpointTimeout) {
+        nextCheckpointNanos(nextCheckpointTimeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable String reason() {
+        return reason;
+    }
+
+    /**
+     * Sets description of the reason of the current checkpoint.
+     *
+     * @param reason New wakeup reason.
+     */
+    public void reason(String reason) {
+        this.reason = reason;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean inProgress() {
+        return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> futureFor(CheckpointState state) {

Review Comment:
   Added ticket link to class header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870186658


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java:
##########
@@ -17,16 +17,52 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
  */
-// TODO: IGNITE-16898 Continue porting the code
 public interface CheckpointProgress {
+    /**
+     * Returns checkpoint ID.
+     */
+    UUID id();
+
+    /**
+     * Returns description of the reason of the current checkpoint.
+     */
+    @Nullable String reason();
+
+    /**
+     * Return {@code true} If checkpoint already started but have not finished yet.
+     */
+    boolean inProgress();
+
     /**
      * Returns future which can be used for detection when current checkpoint reaches the specific state.
      */
     CompletableFuture<?> futureFor(CheckpointState state);
+
+    /**
+     * Returns number of dirty pages in current checkpoint. If checkpoint is not running, returns {@code 0}.
+     */
+    int currentCheckpointPagesCount();
+
+    /**
+     * Returns counter for written checkpoint pages. Not {@code null} only if checkpoint is running.
+     */
+    @Nullable AtomicInteger writtenPagesCounter();

Review Comment:
   Result of porting, `AtomicInteger` are not needed, fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870183771


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));

Review Comment:
   They are not sorted, and while this is not necessary, the heading to the class indicates what will need to be redone, this is a temporary version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870159187


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870137765


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,24 +17,12 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.jetbrains.annotations.Nullable;
-
 /**
  * Empty.
  */
-// TODO: IGNITE-16898 Continue porting the code
+// TODO: IGNITE-16935 Continue porting the code
 public abstract class Checkpointer {
-    /**
-     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
-     *
-     * @param delayFromNow Delay from now in milliseconds.
-     * @param reason Wakeup reason.
-     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
-     */
-    public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+    public abstract Thread runner();

Review Comment:
   For now only for compiling code, porting will be in https://issues.apache.org/jira/browse/IGNITE-16935



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871051971


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -136,15 +128,13 @@ public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedExcep
             throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
         }
 
-        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
-            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
-                UUID id = it.next();
+        for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {

Review Comment:
   I'd argue that we should just leave these markers indefinitely, but who cares. Because we will remove this loop in the future anyways



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870111028


##########
modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extension to standard {@link Runnable} interface.
+ *
+ * <p>Adds proper details to be used with {@link Executor} implementations.
+ *
+ * <p>Only for internal use.

Review Comment:
   In the original also, deleted here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871477524


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+// TODO: IGNITE-16950 Check for a race between futureFor, transitTo and fail
+class CheckpointProgressImpl implements CheckpointProgress {
+    /** Checkpoint id. */
+    private final UUID id = UUID.randomUUID();
+
+    /** Scheduled time of checkpoint. */
+    private volatile long nextCheckpointNanos;
+
+    /** Current checkpoint state. */
+    private volatile AtomicReference<CheckpointState> state = new AtomicReference<>(SCHEDULED);
+
+    /** Future which would be finished when corresponds state is set. */
+    private final Map<CheckpointState, CompletableFuture<Void>> stateFutures = new ConcurrentHashMap<>();
+
+    /** Wakeup reason. */
+    private volatile String reason;
+
+    /** Number of dirty pages in current checkpoint at the beginning of checkpoint. */
+    private volatile int currCheckpointPagesCnt;
+
+    /** Cause of fail, which has happened during the checkpoint or {@code null} if checkpoint was successful. */
+    @Nullable
+    private volatile Throwable failCause;
+
+    /** Counter for written checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger writtenPagesCntr;

Review Comment:
   I know that it's an old code, but why do these fields have to be volatile? And why can't we instantiate them in constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r869981682


##########
modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java:
##########
@@ -31,10 +31,11 @@ public class FastTimestamps {
 
     private static void startUpdater() {
         Thread updater = new Thread("FastTimestamps updater") {
+            /** {@inheritDoc} */
             @Override
             public void run() {
                 while (true) {
-                    coarseCurrentTimeMillis = System.currentTimeMillis();
+                    coarseCurrentTimeMillis = Math.max(coarseCurrentTimeMillis, System.currentTimeMillis());

Review Comment:
   I still don't think that this change is necessary. Can you please roll it back.
   Here's my motivation: if time goes back, old implementation would start counting it again from the past. New implementation will be returning the same value for some time, this can be even worse imo.



##########
modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java:
##########
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_IDLE;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STARTED;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STOPPED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link IgniteWorker} testing.
+ */
+public class IgniteWorkerTest {
+    static final String CLEANUP = "cleanup";
+
+    private final IgniteLogger log = IgniteLogger.forClass(IgniteWorkerTest.class);
+
+    @Test
+    void testNewIgniteWorker() {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        assertEquals("testNode", worker.igniteInstanceName());
+        assertEquals("testWorker", worker.name());
+
+        assertEquals(0, worker.heartbeat());
+
+        assertFalse(worker.isCancelled());
+        assertFalse(worker.isDone());
+
+        assertNull(worker.runner());
+    }
+
+    @Test
+    void testBlockingSelection() {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        long currentTimeMillis = coarseCurrentTimeMillis();
+
+        worker.blockingSectionBegin();
+
+        assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+        worker.blockingSectionEnd();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+        // Checks update heartbeat after blockingSectionBegin().
+
+        worker.blockingSectionBegin();
+
+        assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+        worker.updateHeartbeat();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+        worker.blockingSectionEnd();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+    }
+
+    @Test
+    void testUpdateHeartbeat() throws Exception {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        long currentTimeMillis = coarseCurrentTimeMillis();
+
+        worker.updateHeartbeat();
+
+        long heartbeat = worker.heartbeat();
+
+        assertThat(heartbeat, greaterThanOrEqualTo(currentTimeMillis));
+
+        Thread.sleep(10);
+
+        assertEquals(heartbeat, worker.heartbeat());
+
+        worker.updateHeartbeat();
+
+        assertThat(worker.heartbeat(), greaterThan(heartbeat));
+    }
+
+    @Test
+    void testIdle() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener);
+
+        worker.onIdle();
+
+        assertThat(events, equalTo(List.of(ON_IDLE)));
+    }
+
+    @Test
+    void testRun() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events) {
+            /** {@inheritDoc} */
+            @Override
+            public void onStarted(IgniteWorker worker) {
+                super.onStarted(worker);
+
+                assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+                assertSame(Thread.currentThread(), worker.runner());
+                assertFalse(worker.isCancelled());
+                assertFalse(worker.isDone());
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onStopped(IgniteWorker worker) {
+                super.onStopped(worker);
+
+                assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+                assertSame(Thread.currentThread(), worker.runner());
+                assertFalse(worker.isCancelled());
+                assertTrue(worker.isDone());
+            }
+        };
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+        };
+
+        worker.run();
+
+        assertThat(events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertFalse(worker.isCancelled());
+        assertTrue(worker.isDone());
+    }
+
+    @Test
+    void testInterruptFromBody() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void body() throws InterruptedException {
+                throw new InterruptedException();

Review Comment:
   I think you should also interrupt the thread before throwing an exception



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+
+/**
+ * Data class of checkpoint information.
+ */
+class Checkpoint {
+    /** Checkpoint pages. */
+    final GridConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> dirtyPages;
+
+    /** Checkpoint progress status. */
+    final CheckpointProgressImpl progress;
+
+    /** Number of dirty pages. */
+    final int pagesSize;

Review Comment:
   Can you please rename it? Like, "dirtyPagesSize" maybe?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Holder of information about dirty pages by {@link PageMemoryImpl} for checkpoint.
+ */
+class CheckpointDirtyPagesInfoHolder {
+    /** Total number of dirty pages. */
+    final int pageCount;
+
+    /** Collection of dirty pages per {@link PageMemoryImpl} distribution. */
+    final Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> pages;

Review Comment:
   Why is it not the map, what do you think?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));

Review Comment:
   I don't see a place where you would sort or order cp markers. But they must be ordered, right? Are you planning to do it in the future? Please add TODO then



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+class CheckpointProgressImpl implements CheckpointProgress {
+    /** Checkpoint id. */
+    private final UUID id = UUID.randomUUID();
+
+    /** Scheduled time of checkpoint. */
+    private volatile long nextCheckpointNanos;
+
+    /** Current checkpoint state. */
+    private volatile AtomicReference<CheckpointState> state = new AtomicReference<>(SCHEDULED);
+
+    /** Future which would be finished when corresponds state is set. */
+    private final Map<CheckpointState, CompletableFuture<Void>> stateFutures = new ConcurrentHashMap<>();
+
+    /** Wakeup reason. */
+    private volatile String reason;
+
+    /** Number of dirty pages in current checkpoint at the beginning of checkpoint. */
+    private volatile int currCheckpointPagesCnt;
+
+    /** Cause of fail, which has happened during the checkpoint or {@code null} if checkpoint was successful. */
+    @Nullable
+    private volatile Throwable failCause;
+
+    /** Counter for written checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger writtenPagesCntr;
+
+    /** Counter for fsynced checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger syncedPagesCntr;
+
+    /** Counter for evicted checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger evictedPagesCntr;
+
+    /**
+     * Constructor.
+     *
+     * @param nextCheckpointTimeout Timeout until next checkpoint in nanos.
+     */
+    CheckpointProgressImpl(long nextCheckpointTimeout) {
+        nextCheckpointNanos(nextCheckpointTimeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable String reason() {
+        return reason;
+    }
+
+    /**
+     * Sets description of the reason of the current checkpoint.
+     *
+     * @param reason New wakeup reason.
+     */
+    public void reason(String reason) {
+        this.reason = reason;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean inProgress() {
+        return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> futureFor(CheckpointState state) {
+        CompletableFuture<Void> stateFut = stateFutures.computeIfAbsent(state, (k) -> new CompletableFuture<>());
+
+        if (greaterOrEqualTo(state)) {
+            completeFuture(stateFut, failCause);
+        }
+
+        return stateFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int currentCheckpointPagesCount() {
+        return currCheckpointPagesCnt;
+    }
+
+    /**
+     * Sets current checkpoint pages num to store.
+     *
+     * @param num Pages to store.
+     */
+    public void currentCheckpointPagesCount(int num) {
+        currCheckpointPagesCnt = num;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable AtomicInteger writtenPagesCounter() {
+        return writtenPagesCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable AtomicInteger syncedPagesCounter() {
+        return syncedPagesCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable AtomicInteger evictedPagesCounter() {
+        return evictedPagesCntr;
+    }
+
+    /**
+     * Returns scheduled time of checkpoint.
+     */
+    public long nextCheckpointNanos() {
+        return nextCheckpointNanos;
+    }
+
+    /**
+     * Sets new scheduled time of checkpoint.
+     *
+     * @param nextCheckpointNanos New scheduled time of checkpoint in nanos.
+     */
+    public void nextCheckpointNanos(long nextCheckpointNanos) {

Review Comment:
   This is SO confusing! Field represents instance in time, but parameter is a time interval. And they have the same name. What the hell?
   And why would anyone pass negative value in this method?
   Why a year? Javadoc says nothing about it. I need explanations



##########
modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import java.util.EventListener;
+
+/**
+ * This interface defines worker listener.
+ */
+public interface IgniteWorkerListener extends EventListener {

Review Comment:
   Please remove "extends EventListener", no one uses it



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config

Review Comment:
   This should be managed by a merger process. You can't just delete markers, don't you? I'd remove most of the code related to this property



##########
modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extension to standard {@link Runnable} interface.
+ *
+ * <p>Adds proper details to be used with {@link Executor} implementations.
+ *
+ * <p>Only for internal use.

Review Comment:
   It's in "internal" package, is this comment necessary? Was it in the original code?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);

Review Comment:
   ```suggestion
               throw new IgniteInternalCheckedException("Could not read checkpoint markers: " + checkpointDir, e);
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java:
##########
@@ -17,16 +17,52 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
  */
-// TODO: IGNITE-16898 Continue porting the code
 public interface CheckpointProgress {
+    /**
+     * Returns checkpoint ID.
+     */
+    UUID id();
+
+    /**
+     * Returns description of the reason of the current checkpoint.
+     */
+    @Nullable String reason();
+
+    /**
+     * Return {@code true} If checkpoint already started but have not finished yet.
+     */
+    boolean inProgress();
+
     /**
      * Returns future which can be used for detection when current checkpoint reaches the specific state.
      */
     CompletableFuture<?> futureFor(CheckpointState state);
+
+    /**
+     * Returns number of dirty pages in current checkpoint. If checkpoint is not running, returns {@code 0}.
+     */
+    int currentCheckpointPagesCount();
+
+    /**
+     * Returns counter for written checkpoint pages. Not {@code null} only if checkpoint is running.
+     */
+    @Nullable AtomicInteger writtenPagesCounter();

Review Comment:
   Why would you return internal atomic integer instances? What's the deal with it?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+class CheckpointProgressImpl implements CheckpointProgress {
+    /** Checkpoint id. */
+    private final UUID id = UUID.randomUUID();
+
+    /** Scheduled time of checkpoint. */
+    private volatile long nextCheckpointNanos;
+
+    /** Current checkpoint state. */
+    private volatile AtomicReference<CheckpointState> state = new AtomicReference<>(SCHEDULED);
+
+    /** Future which would be finished when corresponds state is set. */
+    private final Map<CheckpointState, CompletableFuture<Void>> stateFutures = new ConcurrentHashMap<>();
+
+    /** Wakeup reason. */
+    private volatile String reason;
+
+    /** Number of dirty pages in current checkpoint at the beginning of checkpoint. */
+    private volatile int currCheckpointPagesCnt;
+
+    /** Cause of fail, which has happened during the checkpoint or {@code null} if checkpoint was successful. */
+    @Nullable
+    private volatile Throwable failCause;
+
+    /** Counter for written checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger writtenPagesCntr;
+
+    /** Counter for fsynced checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger syncedPagesCntr;
+
+    /** Counter for evicted checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger evictedPagesCntr;
+
+    /**
+     * Constructor.
+     *
+     * @param nextCheckpointTimeout Timeout until next checkpoint in nanos.
+     */
+    CheckpointProgressImpl(long nextCheckpointTimeout) {
+        nextCheckpointNanos(nextCheckpointTimeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable String reason() {
+        return reason;
+    }
+
+    /**
+     * Sets description of the reason of the current checkpoint.
+     *
+     * @param reason New wakeup reason.
+     */
+    public void reason(String reason) {
+        this.reason = reason;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean inProgress() {
+        return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> futureFor(CheckpointState state) {

Review Comment:
   I have a feeling that there's a data race somewhere, can we discuss it later? I know that it's not your code



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,24 +17,12 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.jetbrains.annotations.Nullable;
-
 /**
  * Empty.
  */
-// TODO: IGNITE-16898 Continue porting the code
+// TODO: IGNITE-16935 Continue porting the code
 public abstract class Checkpointer {
-    /**
-     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
-     *
-     * @param delayFromNow Delay from now in milliseconds.
-     * @param reason Wakeup reason.
-     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
-     */
-    public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+    public abstract Thread runner();

Review Comment:
   You removed comments and reordered methods. What was the reason?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/GridConcurrentMultiPairQueue.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The only guarantee {@link #next} provided is sequentially emptify values per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may produce output like:
+ * <br> [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]
+ *
+ * @param <K> The type of key in input pair collection.
+ * @param <V> The type of value array.
+ */
+public class GridConcurrentMultiPairQueue<K, V> {
+    /** Empty pair queue. */
+    public static final GridConcurrentMultiPairQueue EMPTY = new GridConcurrentMultiPairQueue<>(Map.of());
+
+    /** Inner holder. */
+    private final V[][] vals;
+
+    /** Storage for every array length. */
+    private final int[] lenSeq;
+
+    /** Current absolute position. */
+    private final AtomicInteger pos = new AtomicInteger();
+
+    /** Precalculated max position. */
+    private final int maxPos;
+
+    /** Keys array. */
+    private final K[] keysArr;
+
+    /**
+     * Constructor.
+     *
+     * @param items Items.
+     */
+    public GridConcurrentMultiPairQueue(Map<K, ? extends Collection<V>> items) {
+        int pairCnt = (int) items.entrySet().stream().map(Map.Entry::getValue).filter(k -> k.size() > 0).count();

Review Comment:
   "k -> k.size() > 0" should be replaced with "k -> !k.isEmpty()"



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.GridConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class responsibility is to complement {@link Checkpointer} class with side logic of checkpointing like checkpoint listeners
+ * notifications, collect dirt pages etc.
+ *
+ * <p>It allows {@link Checkpointer} class is to focus on its main responsibility: synchronizing memory with disk.
+ *
+ * <p>Additional actions needed during checkpoint are implemented in this class.
+ *
+ * <p>Two main blocks of logic this class is responsible for:
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointBegin} - Initialization of next checkpoint. It collects all required info.
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
+ */
+class CheckpointWorkflow implements IgniteComponent {
+    /**
+     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+     */
+    public static final String CHECKPOINT_PARALLEL_SORT_THRESHOLD = "CHECKPOINT_PARALLEL_SORT_THRESHOLD";
+
+    /**
+     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+     */
+    // TODO: IGNITE-16935 Move to configuration
+    private final int parallelSortThreshold = getInteger(CHECKPOINT_PARALLEL_SORT_THRESHOLD, 512 * 1024);
+
+    /** This number of threads will be created and used for parallel sorting. */
+    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+
+    /** Checkpoint marker storage. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Supplier of persistent data regions for the checkpointing. */
+    private final Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier;
+
+    /** Checkpoint write order configuration. */
+    private final CheckpointWriteOrder checkpointWriteOrder;
+
+    /** Collections of checkpoint listeners. */
+    private final List<IgniteBiTuple<CheckpointListener, PageMemoryDataRegion>> listeners = new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param checkpointMarkersStorage Checkpoint marker storage.
+     * @param checkpointReadWriteLock Checkpoint read write lock.
+     * @param checkpointWriteOrder Checkpoint write order.
+     * @param dataRegionsSupplier Supplier of persistent data regions for the checkpointing.
+     */
+    public CheckpointWorkflow(
+            CheckpointMarkersStorage checkpointMarkersStorage,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            CheckpointWriteOrder checkpointWriteOrder,
+            Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier
+    ) {
+        this.checkpointMarkersStorage = checkpointMarkersStorage;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointWriteOrder = checkpointWriteOrder;
+        this.dataRegionsSupplier = dataRegionsSupplier;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        listeners.clear();
+    }
+
+    /**
+     * First stage of checkpoint which collects demanded information (dirty pages mostly).
+     *
+     * @param startCheckpointTimestamp Checkpoint start timestamp.
+     * @param curr Current checkpoint event info.
+     * @return Checkpoint collected info.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public Checkpoint markCheckpointBegin(
+            long startCheckpointTimestamp,
+            CheckpointProgressImpl curr
+    ) throws IgniteInternalCheckedException {
+        Collection<PageMemoryDataRegion> dataRegions = dataRegionsSupplier.get();
+
+        List<CheckpointListener> listeners = collectCheckpointListeners(dataRegions);
+
+        checkpointReadWriteLock.readLock();
+
+        try {
+            for (CheckpointListener listener : listeners) {
+                listener.beforeCheckpointBegin(curr);
+            }
+        } finally {
+            checkpointReadWriteLock.readUnlock();
+        }
+
+        checkpointReadWriteLock.writeLock();
+
+        CheckpointDirtyPagesInfoHolder dirtyPages;
+
+        try {
+            curr.transitTo(LOCK_TAKEN);
+
+            for (CheckpointListener listener : listeners) {
+                listener.onMarkCheckpointBegin(curr);
+            }
+
+            // There are allowable to replace pages only after checkpoint entry was stored to disk.
+            dirtyPages = beginCheckpoint(dataRegions, curr.futureFor(MARKER_STORED_TO_DISK));
+
+            curr.currentCheckpointPagesCount(dirtyPages.pageCount);
+
+            curr.transitTo(PAGE_SNAPSHOT_TAKEN);
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+
+        curr.transitTo(LOCK_RELEASED);
+
+        for (CheckpointListener listener : listeners) {
+            listener.onCheckpointBegin(curr);
+        }
+
+        if (dirtyPages.pageCount > 0) {
+            checkpointMarkersStorage.onCheckpointBegin(curr.id());
+
+            curr.transitTo(MARKER_STORED_TO_DISK);
+
+            return new Checkpoint(splitAndSortCpPagesIfNeeded(dirtyPages), curr);
+        }
+
+        return new Checkpoint(EMPTY, curr);
+    }
+
+    /**
+     * Do some actions on checkpoint finish (After all pages were written to disk).
+     *
+     * @param chp Checkpoint snapshot.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void markCheckpointEnd(Checkpoint chp) throws IgniteInternalCheckedException {
+        Collection<PageMemoryDataRegion> dataRegions = dataRegionsSupplier.get();

Review Comment:
   Why supplier instead of collection?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);
+        }
+    }
+
+    /**
+     * Callback at the start of the checkpoint.
+     *
+     * <p>Creates a start marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointBegin(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointStartMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+
+        try {
+            createFile(checkpointStartMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create start checkpoint marker: " + checkpointStartMarker, e);
+        }
+
+        checkpointIds.add(checkpointId);
+    }
+
+    /**
+     * Callback at the end of the checkpoint.
+     *
+     * <p>Creates an end marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointEndMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        try {
+            createFile(checkpointEndMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
+        }
+
+        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
+            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
+                UUID id = it.next();
+
+                if (!id.equals(checkpointId)) {
+                    removeCheckpointMarkers(id);
+
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    private void removeCheckpointMarkers(UUID checkpointId) {
+        Path startMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+        Path endMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        if (exists(startMarker)) {
+            startMarker.toFile().delete();
+        }
+
+        if (exists(endMarker)) {
+            endMarker.toFile().delete();
+        }
+    }
+
+    /**
+     * Checks that the directory contains only paired (start and end) checkpoint markers.
+     */
+    private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternalCheckedException {
+        assert isDirectory(checkpointDir) : checkpointDir;
+
+        try {
+            Map<Boolean, List<Path>> files = list(checkpointDir)

Review Comment:
   Is this an old code?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);
+        }
+    }
+
+    /**
+     * Callback at the start of the checkpoint.
+     *
+     * <p>Creates a start marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointBegin(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointStartMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+
+        try {
+            createFile(checkpointStartMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create start checkpoint marker: " + checkpointStartMarker, e);
+        }
+
+        checkpointIds.add(checkpointId);
+    }
+
+    /**
+     * Callback at the end of the checkpoint.
+     *
+     * <p>Creates an end marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointEndMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        try {
+            createFile(checkpointEndMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
+        }
+
+        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
+            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
+                UUID id = it.next();
+
+                if (!id.equals(checkpointId)) {
+                    removeCheckpointMarkers(id);
+
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    private void removeCheckpointMarkers(UUID checkpointId) {
+        Path startMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+        Path endMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        if (exists(startMarker)) {
+            startMarker.toFile().delete();
+        }
+
+        if (exists(endMarker)) {
+            endMarker.toFile().delete();
+        }
+    }
+
+    /**
+     * Checks that the directory contains only paired (start and end) checkpoint markers.
+     */
+    private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternalCheckedException {
+        assert isDirectory(checkpointDir) : checkpointDir;
+
+        try {
+            Map<Boolean, List<Path>> files = list(checkpointDir)
+                    .collect(partitioningBy(path -> parseCheckpointIdFromMarkerFile(path) != null));
+
+            if (!files.get(false).isEmpty()) {
+                throw new IgniteInternalCheckedException(

Review Comment:
   Will it fail in presence of "tmp" marker file? I guess so



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/GridConcurrentMultiPairQueue.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The only guarantee {@link #next} provided is sequentially emptify values per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may produce output like:
+ * <br> [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]

Review Comment:
   I don't get this order. Do you? Why is it "3, 1, 2"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870120308


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Holder of information about dirty pages by {@link PageMemoryImpl} for checkpoint.
+ */
+class CheckpointDirtyPagesInfoHolder {
+    /** Total number of dirty pages. */
+    final int pageCount;
+
+    /** Collection of dirty pages per {@link PageMemoryImpl} distribution. */
+    final Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> pages;

Review Comment:
   While I have one thought - saving memory, I can convert it to a map, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870182068


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);
+        }
+    }
+
+    /**
+     * Callback at the start of the checkpoint.
+     *
+     * <p>Creates a start marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointBegin(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointStartMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+
+        try {
+            createFile(checkpointStartMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create start checkpoint marker: " + checkpointStartMarker, e);
+        }
+
+        checkpointIds.add(checkpointId);
+    }
+
+    /**
+     * Callback at the end of the checkpoint.
+     *
+     * <p>Creates an end marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointEndMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        try {
+            createFile(checkpointEndMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
+        }
+
+        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
+            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
+                UUID id = it.next();
+
+                if (!id.equals(checkpointId)) {
+                    removeCheckpointMarkers(id);
+
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    private void removeCheckpointMarkers(UUID checkpointId) {
+        Path startMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+        Path endMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        if (exists(startMarker)) {
+            startMarker.toFile().delete();
+        }
+
+        if (exists(endMarker)) {
+            endMarker.toFile().delete();
+        }
+    }
+
+    /**
+     * Checks that the directory contains only paired (start and end) checkpoint markers.
+     */
+    private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternalCheckedException {
+        assert isDirectory(checkpointDir) : checkpointDir;
+
+        try {
+            Map<Boolean, List<Path>> files = list(checkpointDir)
+                    .collect(partitioningBy(path -> parseCheckpointIdFromMarkerFile(path) != null));
+
+            if (!files.get(false).isEmpty()) {
+                throw new IgniteInternalCheckedException(

Review Comment:
   At the moment, no temporary files are expected and there will be an error, later this will be implemented differently. 
   Added test: `org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointMarkersStorageTest#testTmpMarkersFiles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870114778


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+
+/**
+ * Data class of checkpoint information.
+ */
+class Checkpoint {
+    /** Checkpoint pages. */
+    final GridConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> dirtyPages;
+
+    /** Checkpoint progress status. */
+    final CheckpointProgressImpl progress;
+
+    /** Number of dirty pages. */
+    final int pagesSize;

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870114155


##########
modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java:
##########
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_IDLE;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STARTED;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STOPPED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link IgniteWorker} testing.
+ */
+public class IgniteWorkerTest {
+    static final String CLEANUP = "cleanup";
+
+    private final IgniteLogger log = IgniteLogger.forClass(IgniteWorkerTest.class);
+
+    @Test
+    void testNewIgniteWorker() {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        assertEquals("testNode", worker.igniteInstanceName());
+        assertEquals("testWorker", worker.name());
+
+        assertEquals(0, worker.heartbeat());
+
+        assertFalse(worker.isCancelled());
+        assertFalse(worker.isDone());
+
+        assertNull(worker.runner());
+    }
+
+    @Test
+    void testBlockingSelection() {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        long currentTimeMillis = coarseCurrentTimeMillis();
+
+        worker.blockingSectionBegin();
+
+        assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+        worker.blockingSectionEnd();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+        // Checks update heartbeat after blockingSectionBegin().
+
+        worker.blockingSectionBegin();
+
+        assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+        worker.updateHeartbeat();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+        worker.blockingSectionEnd();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+    }
+
+    @Test
+    void testUpdateHeartbeat() throws Exception {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        long currentTimeMillis = coarseCurrentTimeMillis();
+
+        worker.updateHeartbeat();
+
+        long heartbeat = worker.heartbeat();
+
+        assertThat(heartbeat, greaterThanOrEqualTo(currentTimeMillis));
+
+        Thread.sleep(10);
+
+        assertEquals(heartbeat, worker.heartbeat());
+
+        worker.updateHeartbeat();
+
+        assertThat(worker.heartbeat(), greaterThan(heartbeat));
+    }
+
+    @Test
+    void testIdle() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener);
+
+        worker.onIdle();
+
+        assertThat(events, equalTo(List.of(ON_IDLE)));
+    }
+
+    @Test
+    void testRun() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events) {
+            /** {@inheritDoc} */
+            @Override
+            public void onStarted(IgniteWorker worker) {
+                super.onStarted(worker);
+
+                assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+                assertSame(Thread.currentThread(), worker.runner());
+                assertFalse(worker.isCancelled());
+                assertFalse(worker.isDone());
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onStopped(IgniteWorker worker) {
+                super.onStopped(worker);
+
+                assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+                assertSame(Thread.currentThread(), worker.runner());
+                assertFalse(worker.isCancelled());
+                assertTrue(worker.isDone());
+            }
+        };
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+        };
+
+        worker.run();
+
+        assertThat(events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertFalse(worker.isCancelled());
+        assertTrue(worker.isDone());
+    }
+
+    @Test
+    void testInterruptFromBody() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void body() throws InterruptedException {
+                throw new InterruptedException();

Review Comment:
   Added it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870222107


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.GridConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class responsibility is to complement {@link Checkpointer} class with side logic of checkpointing like checkpoint listeners
+ * notifications, collect dirt pages etc.
+ *
+ * <p>It allows {@link Checkpointer} class is to focus on its main responsibility: synchronizing memory with disk.
+ *
+ * <p>Additional actions needed during checkpoint are implemented in this class.
+ *
+ * <p>Two main blocks of logic this class is responsible for:
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointBegin} - Initialization of next checkpoint. It collects all required info.
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
+ */
+class CheckpointWorkflow implements IgniteComponent {
+    /**
+     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+     */
+    public static final String CHECKPOINT_PARALLEL_SORT_THRESHOLD = "CHECKPOINT_PARALLEL_SORT_THRESHOLD";
+
+    /**
+     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+     */
+    // TODO: IGNITE-16935 Move to configuration
+    private final int parallelSortThreshold = getInteger(CHECKPOINT_PARALLEL_SORT_THRESHOLD, 512 * 1024);
+
+    /** This number of threads will be created and used for parallel sorting. */
+    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+
+    /** Checkpoint marker storage. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Supplier of persistent data regions for the checkpointing. */
+    private final Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier;
+
+    /** Checkpoint write order configuration. */
+    private final CheckpointWriteOrder checkpointWriteOrder;
+
+    /** Collections of checkpoint listeners. */
+    private final List<IgniteBiTuple<CheckpointListener, PageMemoryDataRegion>> listeners = new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param checkpointMarkersStorage Checkpoint marker storage.
+     * @param checkpointReadWriteLock Checkpoint read write lock.
+     * @param checkpointWriteOrder Checkpoint write order.
+     * @param dataRegionsSupplier Supplier of persistent data regions for the checkpointing.
+     */
+    public CheckpointWorkflow(
+            CheckpointMarkersStorage checkpointMarkersStorage,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            CheckpointWriteOrder checkpointWriteOrder,
+            Supplier<Collection<PageMemoryDataRegion>> dataRegionsSupplier
+    ) {
+        this.checkpointMarkersStorage = checkpointMarkersStorage;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointWriteOrder = checkpointWriteOrder;
+        this.dataRegionsSupplier = dataRegionsSupplier;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        listeners.clear();
+    }
+
+    /**
+     * First stage of checkpoint which collects demanded information (dirty pages mostly).
+     *
+     * @param startCheckpointTimestamp Checkpoint start timestamp.
+     * @param curr Current checkpoint event info.
+     * @return Checkpoint collected info.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public Checkpoint markCheckpointBegin(
+            long startCheckpointTimestamp,
+            CheckpointProgressImpl curr
+    ) throws IgniteInternalCheckedException {
+        Collection<PageMemoryDataRegion> dataRegions = dataRegionsSupplier.get();
+
+        List<CheckpointListener> listeners = collectCheckpointListeners(dataRegions);
+
+        checkpointReadWriteLock.readLock();
+
+        try {
+            for (CheckpointListener listener : listeners) {
+                listener.beforeCheckpointBegin(curr);
+            }
+        } finally {
+            checkpointReadWriteLock.readUnlock();
+        }
+
+        checkpointReadWriteLock.writeLock();
+
+        CheckpointDirtyPagesInfoHolder dirtyPages;
+
+        try {
+            curr.transitTo(LOCK_TAKEN);
+
+            for (CheckpointListener listener : listeners) {
+                listener.onMarkCheckpointBegin(curr);
+            }
+
+            // There are allowable to replace pages only after checkpoint entry was stored to disk.
+            dirtyPages = beginCheckpoint(dataRegions, curr.futureFor(MARKER_STORED_TO_DISK));
+
+            curr.currentCheckpointPagesCount(dirtyPages.pageCount);
+
+            curr.transitTo(PAGE_SNAPSHOT_TAKEN);
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+
+        curr.transitTo(LOCK_RELEASED);
+
+        for (CheckpointListener listener : listeners) {
+            listener.onCheckpointBegin(curr);
+        }
+
+        if (dirtyPages.pageCount > 0) {
+            checkpointMarkersStorage.onCheckpointBegin(curr.id());
+
+            curr.transitTo(MARKER_STORED_TO_DISK);
+
+            return new Checkpoint(splitAndSortCpPagesIfNeeded(dirtyPages), curr);
+        }
+
+        return new Checkpoint(EMPTY, curr);
+    }
+
+    /**
+     * Do some actions on checkpoint finish (After all pages were written to disk).
+     *
+     * @param chp Checkpoint snapshot.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void markCheckpointEnd(Checkpoint chp) throws IgniteInternalCheckedException {
+        Collection<PageMemoryDataRegion> dataRegions = dataRegionsSupplier.get();

Review Comment:
   No reason, fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871145100


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Holder of information about dirty pages by {@link PageMemoryImpl} for checkpoint.
+ */
+class CheckpointDirtyPagesInfoHolder {
+    /** Total number of dirty pages. */
+    final int pageCount;
+
+    /** Collection of dirty pages per {@link PageMemoryImpl} distribution. */
+    final Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> pages;

Review Comment:
   I don't think it's necessary. It's just interesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871145671


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);
+        }
+    }
+
+    /**
+     * Callback at the start of the checkpoint.
+     *
+     * <p>Creates a start marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointBegin(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointStartMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+
+        try {
+            createFile(checkpointStartMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create start checkpoint marker: " + checkpointStartMarker, e);
+        }
+
+        checkpointIds.add(checkpointId);
+    }
+
+    /**
+     * Callback at the end of the checkpoint.
+     *
+     * <p>Creates an end marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointEndMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        try {
+            createFile(checkpointEndMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
+        }
+
+        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
+            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
+                UUID id = it.next();
+
+                if (!id.equals(checkpointId)) {
+                    removeCheckpointMarkers(id);
+
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    private void removeCheckpointMarkers(UUID checkpointId) {
+        Path startMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+        Path endMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        if (exists(startMarker)) {
+            startMarker.toFile().delete();
+        }
+
+        if (exists(endMarker)) {
+            endMarker.toFile().delete();
+        }
+    }
+
+    /**
+     * Checks that the directory contains only paired (start and end) checkpoint markers.
+     */
+    private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternalCheckedException {
+        assert isDirectory(checkpointDir) : checkpointDir;
+
+        try {
+            Map<Boolean, List<Path>> files = list(checkpointDir)

Review Comment:
   Map<Boolean, ..> doesn't look like a good solution for me personally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871058669


##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java:
##########
@@ -64,19 +66,22 @@ void testReason() {
 
     @Test
     void testNextCheckpointNanos() {
-        assertThat(new CheckpointProgressImpl(0).nextCheckpointNanos(), lessThanOrEqualTo(nanoTime()));
-
-        assertThat(new CheckpointProgressImpl(-1).nextCheckpointNanos(), lessThanOrEqualTo(nanoTime()));
-
-        assertThat(new CheckpointProgressImpl(Long.MIN_VALUE).nextCheckpointNanos(), lessThanOrEqualTo(nanoTime()));
+        assertThat(
+                new CheckpointProgressImpl(0).nextCheckpointNanos(),
+                allOf(greaterThan(0L), lessThanOrEqualTo(nanoTime()))

Review Comment:
   I just realized that these assertions are wrong! Nano time can be negative, there's nothing wrong with it. I'm not even sure about proper assertions. When it comes to time, things become very tricky.
   You can do the following, for example:
   - long before = nanoTime();
   - long cp = new CheckpointProgressImpl(0).nextCheckpointNanos();
   - long after = nanoTime();
   - assert cp - before >= 0;
   - assert after - cp >= 0;
   
   Assertions for "year" will be different, obviously.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871058597


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -136,15 +128,13 @@ public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedExcep
             throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
         }
 
-        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
-            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
-                UUID id = it.next();
+        for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {

Review Comment:
   Yes, this is just a temporary version for which I do not write the correct code yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870122209


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/GridConcurrentMultiPairQueue.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The only guarantee {@link #next} provided is sequentially emptify values per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may produce output like:
+ * <br> [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]
+ *
+ * @param <K> The type of key in input pair collection.
+ * @param <V> The type of value array.
+ */
+public class GridConcurrentMultiPairQueue<K, V> {
+    /** Empty pair queue. */
+    public static final GridConcurrentMultiPairQueue EMPTY = new GridConcurrentMultiPairQueue<>(Map.of());
+
+    /** Inner holder. */
+    private final V[][] vals;
+
+    /** Storage for every array length. */
+    private final int[] lenSeq;
+
+    /** Current absolute position. */
+    private final AtomicInteger pos = new AtomicInteger();
+
+    /** Precalculated max position. */
+    private final int maxPos;
+
+    /** Keys array. */
+    private final K[] keysArr;
+
+    /**
+     * Constructor.
+     *
+     * @param items Items.
+     */
+    public GridConcurrentMultiPairQueue(Map<K, ? extends Collection<V>> items) {
+        int pairCnt = (int) items.entrySet().stream().map(Map.Entry::getValue).filter(k -> k.size() > 0).count();

Review Comment:
   Correct it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870138333


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,24 +17,12 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.jetbrains.annotations.Nullable;
-
 /**
  * Empty.
  */
-// TODO: IGNITE-16898 Continue porting the code
+// TODO: IGNITE-16935 Continue porting the code
 public abstract class Checkpointer {
-    /**
-     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
-     *
-     * @param delayFromNow Delay from now in milliseconds.
-     * @param reason Wakeup reason.
-     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
-     */
-    public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+    public abstract Thread runner();

Review Comment:
   Ticket link above in code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871053529


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -136,15 +128,13 @@ public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedExcep
             throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
         }
 
-        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
-            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
-                UUID id = it.next();
+        for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {

Review Comment:
   You know, it's funny that if node breaks during marker removal, and leaves END marker, deleting START at the same time - node just won't start. Just sayin'. No need to fix it right now, as I mentioned, a lot of thing will be revisited.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov merged pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov merged PR #800:
URL: https://github.com/apache/ignite-3/pull/800


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870128222


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/GridConcurrentMultiPairQueue.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The only guarantee {@link #next} provided is sequentially emptify values per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may produce output like:
+ * <br> [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]

Review Comment:
   I think about the order here is a little incorrectly indicated, I will slightly correct the description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870111746


##########
modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import java.util.EventListener;
+
+/**
+ * This interface defines worker listener.
+ */
+public interface IgniteWorkerListener extends EventListener {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870109791


##########
modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java:
##########
@@ -31,10 +31,11 @@ public class FastTimestamps {
 
     private static void startUpdater() {
         Thread updater = new Thread("FastTimestamps updater") {
+            /** {@inheritDoc} */
             @Override
             public void run() {
                 while (true) {
-                    coarseCurrentTimeMillis = System.currentTimeMillis();
+                    coarseCurrentTimeMillis = Math.max(coarseCurrentTimeMillis, System.currentTimeMillis());

Review Comment:
   Rolled back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871131495


##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java:
##########
@@ -64,19 +66,22 @@ void testReason() {
 
     @Test
     void testNextCheckpointNanos() {
-        assertThat(new CheckpointProgressImpl(0).nextCheckpointNanos(), lessThanOrEqualTo(nanoTime()));
-
-        assertThat(new CheckpointProgressImpl(-1).nextCheckpointNanos(), lessThanOrEqualTo(nanoTime()));
-
-        assertThat(new CheckpointProgressImpl(Long.MIN_VALUE).nextCheckpointNanos(), lessThanOrEqualTo(nanoTime()));
+        assertThat(
+                new CheckpointProgressImpl(0).nextCheckpointNanos(),
+                allOf(greaterThan(0L), lessThanOrEqualTo(nanoTime()))

Review Comment:
   Tried to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870159481


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);
+        }
+    }
+
+    /**
+     * Callback at the start of the checkpoint.
+     *
+     * <p>Creates a start marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointBegin(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointStartMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+
+        try {
+            createFile(checkpointStartMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create start checkpoint marker: " + checkpointStartMarker, e);
+        }
+
+        checkpointIds.add(checkpointId);
+    }
+
+    /**
+     * Callback at the end of the checkpoint.
+     *
+     * <p>Creates an end marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointEndMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        try {
+            createFile(checkpointEndMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
+        }
+
+        if (checkpointIds.size() >= earliestCheckpointChangesThreshold) {
+            for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
+                UUID id = it.next();
+
+                if (!id.equals(checkpointId)) {
+                    removeCheckpointMarkers(id);
+
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    private void removeCheckpointMarkers(UUID checkpointId) {
+        Path startMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+        Path endMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        if (exists(startMarker)) {
+            startMarker.toFile().delete();
+        }
+
+        if (exists(endMarker)) {
+            endMarker.toFile().delete();
+        }
+    }
+
+    /**
+     * Checks that the directory contains only paired (start and end) checkpoint markers.
+     */
+    private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternalCheckedException {
+        assert isDirectory(checkpointDir) : checkpointDir;
+
+        try {
+            Map<Boolean, List<Path>> files = list(checkpointDir)

Review Comment:
   No



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r870221308


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+class CheckpointProgressImpl implements CheckpointProgress {
+    /** Checkpoint id. */
+    private final UUID id = UUID.randomUUID();
+
+    /** Scheduled time of checkpoint. */
+    private volatile long nextCheckpointNanos;
+
+    /** Current checkpoint state. */
+    private volatile AtomicReference<CheckpointState> state = new AtomicReference<>(SCHEDULED);
+
+    /** Future which would be finished when corresponds state is set. */
+    private final Map<CheckpointState, CompletableFuture<Void>> stateFutures = new ConcurrentHashMap<>();
+
+    /** Wakeup reason. */
+    private volatile String reason;
+
+    /** Number of dirty pages in current checkpoint at the beginning of checkpoint. */
+    private volatile int currCheckpointPagesCnt;
+
+    /** Cause of fail, which has happened during the checkpoint or {@code null} if checkpoint was successful. */
+    @Nullable
+    private volatile Throwable failCause;
+
+    /** Counter for written checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger writtenPagesCntr;
+
+    /** Counter for fsynced checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger syncedPagesCntr;
+
+    /** Counter for evicted checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger evictedPagesCntr;
+
+    /**
+     * Constructor.
+     *
+     * @param nextCheckpointTimeout Timeout until next checkpoint in nanos.
+     */
+    CheckpointProgressImpl(long nextCheckpointTimeout) {
+        nextCheckpointNanos(nextCheckpointTimeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable String reason() {
+        return reason;
+    }
+
+    /**
+     * Sets description of the reason of the current checkpoint.
+     *
+     * @param reason New wakeup reason.
+     */
+    public void reason(String reason) {
+        this.reason = reason;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean inProgress() {
+        return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> futureFor(CheckpointState state) {
+        CompletableFuture<Void> stateFut = stateFutures.computeIfAbsent(state, (k) -> new CompletableFuture<>());
+
+        if (greaterOrEqualTo(state)) {
+            completeFuture(stateFut, failCause);
+        }
+
+        return stateFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int currentCheckpointPagesCount() {
+        return currCheckpointPagesCnt;
+    }
+
+    /**
+     * Sets current checkpoint pages num to store.
+     *
+     * @param num Pages to store.
+     */
+    public void currentCheckpointPagesCount(int num) {
+        currCheckpointPagesCnt = num;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable AtomicInteger writtenPagesCounter() {
+        return writtenPagesCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable AtomicInteger syncedPagesCounter() {
+        return syncedPagesCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable AtomicInteger evictedPagesCounter() {
+        return evictedPagesCntr;
+    }
+
+    /**
+     * Returns scheduled time of checkpoint.
+     */
+    public long nextCheckpointNanos() {
+        return nextCheckpointNanos;
+    }
+
+    /**
+     * Sets new scheduled time of checkpoint.
+     *
+     * @param nextCheckpointNanos New scheduled time of checkpoint in nanos.
+     */
+    public void nextCheckpointNanos(long nextCheckpointNanos) {

Review Comment:
   Fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #800: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #800:
URL: https://github.com/apache/ignite-3/pull/800#discussion_r871050449


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Earliest checkpoint map changes threshold system properties. */
+    public static final String IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD = "IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD";
+
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /** Earliest checkpoint map changes threshold. */
+    // TODO: IGNITE-16935 Move to config
+    private final int earliestCheckpointChangesThreshold = getInteger(IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD, 5);
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));

Review Comment:
   Ok, this is fine, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org