You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/26 14:32:16 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #815: IGNITE-17011 [Native Persistence 3.0] Porting FilePageStoreManager from 2.0

ibessonov commented on code in PR #815:
URL: https://github.com/apache/ignite-3/pull/815#discussion_r882500298


##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -733,4 +734,27 @@ public static <T> T getUninterruptibly(CompletableFuture<T> future) throws Execu
             }
         }
     }
+
+    /**
+     * Stops workers from given collection and waits for their completion.
+     *
+     * @param workers Workers collection.
+     * @param cancel Whether it should cancel workers.
+     * @param log Logger.
+     */
+    public static void awaitForWorkersStop(Collection<IgniteWorker> workers, boolean cancel, @Nullable IgniteLogger log) {

Review Comment:
   Is this a ported code or a new one? I'd suggest cancelling all workers before joining any of them, because it could be so much faster



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java:
##########
@@ -19,24 +19,34 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
 
 /**
  * This class adds some necessary plumbing on top of the {@link Thread} class. Specifically, it adds:
  * <ul>
  *      <li>Consistent naming of threads</li>;
- *      <li>Name of the grid this thread belongs to</li>.
+ *      <li>Name of the ignite this thread belongs to</li>.

Review Comment:
   "ignite node" maybe?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.file.Files.createDirectories;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_DATA;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_IDX;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager;
+import org.apache.ignite.internal.util.IgniteStripedLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File page store manager.
+ */
+public class FilePageStoreManager implements IgniteComponent, PageReadWriteManager {
+    /** File suffix. */
+    public static final String FILE_SUFFIX = ".bin";
+
+    /** Partition file prefix. */
+    public static final String PART_FILE_PREFIX = "part-";
+
+    /** Index file prefix. */
+    public static final String INDEX_FILE_PREFIX = "index";
+
+    /** Index file name. */
+    public static final String INDEX_FILE_NAME = INDEX_FILE_PREFIX + FILE_SUFFIX;
+
+    /** Partition file template. */
+    public static final String PART_FILE_TEMPLATE = PART_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Group directory prefix. */
+    public static final String GROUP_DIR_PREFIX = "group-";
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Starting directory for all file page stores, for example: 'db/group-123/index.bin'. */

Review Comment:
   I'm not sure that directory format is agreed upon right now, I'd avoid mentioning it



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoreHolder.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import java.util.AbstractList;
+
+/**
+ * Holder of the group page stores (index and partitions).
+ *
+ * @param <T> Type of {@link PageStore}.
+ */
+class GroupPageStoreHolder<T extends PageStore> extends AbstractList<T> {

Review Comment:
   you should also add RandomAccess, just in case



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.file.Files.createDirectories;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_DATA;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_IDX;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager;
+import org.apache.ignite.internal.util.IgniteStripedLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File page store manager.
+ */
+public class FilePageStoreManager implements IgniteComponent, PageReadWriteManager {
+    /** File suffix. */
+    public static final String FILE_SUFFIX = ".bin";
+
+    /** Partition file prefix. */
+    public static final String PART_FILE_PREFIX = "part-";
+
+    /** Index file prefix. */
+    public static final String INDEX_FILE_PREFIX = "index";
+
+    /** Index file name. */
+    public static final String INDEX_FILE_NAME = INDEX_FILE_PREFIX + FILE_SUFFIX;
+
+    /** Partition file template. */
+    public static final String PART_FILE_TEMPLATE = PART_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Group directory prefix. */
+    public static final String GROUP_DIR_PREFIX = "group-";
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Starting directory for all file page stores, for example: 'db/group-123/index.bin'. */
+    private final Path dbDir;
+
+    /** {@link FileIo} factory for file page store. */
+    private final FileIoFactory filePageStoreFileIoFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Page read write manager. */
+    private final PageReadWriteManagerImpl pageReadWriteManager = new PageReadWriteManagerImpl(this);
+
+    /**
+     * Executor to disallow running code that modifies data in {@link #groupPageStoreHolders} concurrently with cleanup of file page store.
+     */
+    private final LongOperationAsyncExecutor cleanupAsyncExecutor;
+
+    /** Mapping: group ID -> {@link GroupPageStoreHolder}. */
+    private final GroupPageStoreHolderMap<FilePageStore> groupPageStoreHolders;
+
+    /** Group directory initialization lock. */
+    private final IgniteStripedLock initGroupDirLock = new IgniteStripedLock(Math.max(Runtime.getRuntime().availableProcessors(), 8));
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance.
+     * @param storagePath Storage path.
+     * @param filePageStoreFileIoFactory {@link FileIo} factory for file page store.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public FilePageStoreManager(
+            IgniteLogger log,
+            String igniteInstanceName,
+            Path storagePath,
+            FileIoFactory filePageStoreFileIoFactory,
+            // TODO: IGNITE-17017 Move to common config
+            int pageSize
+    ) throws IgniteInternalCheckedException {
+        this.log = log;
+        this.filePageStoreFileIoFactory = filePageStoreFileIoFactory;
+        this.dbDir = storagePath.resolve("db");
+        this.pageSize = pageSize;
+
+        try {
+            createDirectories(dbDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create work directory for page stores: " + dbDir, e);
+        }
+
+        cleanupAsyncExecutor = new LongOperationAsyncExecutor(igniteInstanceName, log);
+
+        groupPageStoreHolders = new GroupPageStoreHolderMap<>(cleanupAsyncExecutor);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        if (log.isWarnEnabled()) {
+            String tmpDir = System.getProperty("java.io.tmpdir");
+
+            if (tmpDir != null && this.dbDir.startsWith(tmpDir)) {
+                log.warn("Persistence store directory is in the temp directory and may be cleaned. "
+                        + "To avoid this change location of persistence directories. "
+                        + "Current persistence store directory is:" + this.dbDir);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        stopAllGroupFilePageStores(false);
+
+        cleanupAsyncExecutor.awaitAsyncTaskCompletion(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        pageReadWriteManager.read(grpId, pageId, pageBuf, keepCrc);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public FilePageStore write(
+            int grpId,
+            long pageId,
+            ByteBuffer pageBuf,
+            int tag,
+            boolean calculateCrc
+    ) throws IgniteInternalCheckedException {
+        return pageReadWriteManager.write(grpId, pageId, pageBuf, tag, calculateCrc);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        return pageReadWriteManager.allocatePage(grpId, partId, flags);
+    }
+
+    /**
+     * Initializing the file page stores for a group.
+     *
+     * @param grpName Group name.
+     * @param grpId Group ID.
+     * @param partitions Partition number, must be greater than {@code 0} and less {@link PageIdAllocator#MAX_PARTITION_ID} + 1.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void initialize(String grpName, int grpId, int partitions) throws IgniteInternalCheckedException {
+        assert partitions > 0 && partitions < MAX_PARTITION_ID + 1 : partitions;
+
+        initGroupDirLock.lock(grpId);
+
+        try {
+            if (!groupPageStoreHolders.containsKey(grpId)) {
+                GroupPageStoreHolder<FilePageStore> holder = createGroupFilePageStoreHolder(grpName, partitions);
+
+                GroupPageStoreHolder<FilePageStore> old = groupPageStoreHolders.put(grpId, holder);
+
+                assert old == null : grpName;
+            }
+        } catch (IgniteInternalCheckedException e) {
+            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+            throw e;
+        } finally {
+            initGroupDirLock.unlock(grpId);
+        }
+    }
+
+    /**
+     * Returns collection of related file page stores for group.
+     *
+     * @param grpId Group ID.
+     */
+    public @Nullable Collection<FilePageStore> getStores(int grpId) {
+        return groupPageStoreHolders.get(grpId);
+    }
+
+    /**
+     * Returns file page store for the corresponding parameters.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID, either {@link PageIdAllocator#INDEX_PARTITION} or {@code 0} to {@link PageIdAllocator#MAX_PARTITION_ID}
+     *      (inclusive).
+     * @throws IgniteInternalCheckedException If group or partition with the given ID was not created.
+     */
+    public FilePageStore getStore(int grpId, int partId) throws IgniteInternalCheckedException {
+        assert partId >= 0 && (partId == INDEX_PARTITION || partId <= MAX_PARTITION_ID) : partId;
+
+        GroupPageStoreHolder<FilePageStore> holder = groupPageStoreHolders.get(grpId);
+
+        if (holder == null) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to get file page store for the given group ID (group has not been started): " + grpId
+            );
+        }
+
+        if (partId == INDEX_PARTITION) {
+            return holder.idxStore;
+        }
+
+        if (partId >= holder.partStores.length) {
+            throw new IgniteInternalCheckedException(String.format(
+                    "Failed to get file page store for the given partition ID (partition has not been created) [grpId=%s, partId=%s]",
+                    grpId,
+                    partId
+            ));
+        }
+
+        return holder.partStores[partId];
+    }
+
+    /**
+     * Stops the all group file page stores.
+     *
+     * @param cleanFiles Delete files.
+     */
+    void stopAllGroupFilePageStores(boolean cleanFiles) {
+        List<GroupPageStoreHolder<FilePageStore>> holders = new ArrayList<>(groupPageStoreHolders.size());
+
+        for (Iterator<GroupPageStoreHolder<FilePageStore>> it = groupPageStoreHolders.values().iterator(); it.hasNext(); ) {
+            GroupPageStoreHolder<FilePageStore> holder = it.next();
+
+            it.remove();
+
+            holders.add(holder);
+        }
+
+        Runnable stopPageStores = () -> {
+            try {
+                stopGroupFilePageStores(holders, cleanFiles);
+
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Cleanup cache stores [total=%s, cleanFiles=%s]", holders.size(), cleanFiles));
+                }
+            } catch (Exception e) {
+                log.error("Failed to gracefully stop page store managers", e);
+            }
+        };
+
+        if (cleanFiles) {
+            cleanupAsyncExecutor.async(stopPageStores, "file-page-stores-cleanup");
+        } else {
+            stopPageStores.run();
+        }
+    }
+
+    private static void stopGroupFilePageStores(
+            Collection<GroupPageStoreHolder<FilePageStore>> groupFilePageStoreHolders,
+            boolean cleanFiles
+    ) throws IgniteInternalCheckedException {
+        try {
+            closeAll(groupFilePageStoreHolders.stream().flatMap(Collection::stream).map(pageStore -> () -> pageStore.stop(cleanFiles)));
+        } catch (IgniteInternalCheckedException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new IgniteInternalCheckedException(e);
+        }
+    }
+
+    private Path ensureGroupWorkDir(String grpName) throws IgniteInternalCheckedException {
+        Path groupWorkDir = dbDir.resolve(GROUP_DIR_PREFIX + grpName);
+
+        try {
+            Files.createDirectories(groupWorkDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Failed to initialize group working directory "
+                    + "(failed to create, make sure the work folder has correct permissions): "
+                    + groupWorkDir, e);
+        }
+
+        return groupWorkDir;
+    }
+
+    private GroupPageStoreHolder<FilePageStore> createGroupFilePageStoreHolder(
+            String grpName,
+            int partitions
+    ) throws IgniteInternalCheckedException {
+        Path groupWorkDir = ensureGroupWorkDir(grpName);
+
+        FilePageStoreFactory filePageStoreFactory = new FilePageStoreFactory(filePageStoreFileIoFactory, pageSize);
+
+        FilePageStore idxFilePageStore = filePageStoreFactory.createPageStore(TYPE_IDX, groupWorkDir.resolve(INDEX_FILE_NAME));
+
+        FilePageStore[] partitionFilePageStores = new FilePageStore[partitions];
+
+        for (int i = 0; i < partitions; i++) {
+            partitionFilePageStores[i] = filePageStoreFactory.createPageStore(

Review Comment:
   Is this the original code? I don't see the presence of affinity here, is that intended?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.file.Files.createDirectories;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_DATA;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_IDX;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager;
+import org.apache.ignite.internal.util.IgniteStripedLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File page store manager.
+ */
+public class FilePageStoreManager implements IgniteComponent, PageReadWriteManager {
+    /** File suffix. */
+    public static final String FILE_SUFFIX = ".bin";
+
+    /** Partition file prefix. */
+    public static final String PART_FILE_PREFIX = "part-";
+
+    /** Index file prefix. */
+    public static final String INDEX_FILE_PREFIX = "index";
+
+    /** Index file name. */
+    public static final String INDEX_FILE_NAME = INDEX_FILE_PREFIX + FILE_SUFFIX;
+
+    /** Partition file template. */
+    public static final String PART_FILE_TEMPLATE = PART_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Group directory prefix. */
+    public static final String GROUP_DIR_PREFIX = "group-";
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Starting directory for all file page stores, for example: 'db/group-123/index.bin'. */
+    private final Path dbDir;
+
+    /** {@link FileIo} factory for file page store. */
+    private final FileIoFactory filePageStoreFileIoFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Page read write manager. */
+    private final PageReadWriteManagerImpl pageReadWriteManager = new PageReadWriteManagerImpl(this);
+
+    /**
+     * Executor to disallow running code that modifies data in {@link #groupPageStoreHolders} concurrently with cleanup of file page store.
+     */
+    private final LongOperationAsyncExecutor cleanupAsyncExecutor;
+
+    /** Mapping: group ID -> {@link GroupPageStoreHolder}. */
+    private final GroupPageStoreHolderMap<FilePageStore> groupPageStoreHolders;
+
+    /** Group directory initialization lock. */
+    private final IgniteStripedLock initGroupDirLock = new IgniteStripedLock(Math.max(Runtime.getRuntime().availableProcessors(), 8));
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance.
+     * @param storagePath Storage path.
+     * @param filePageStoreFileIoFactory {@link FileIo} factory for file page store.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public FilePageStoreManager(
+            IgniteLogger log,
+            String igniteInstanceName,
+            Path storagePath,
+            FileIoFactory filePageStoreFileIoFactory,
+            // TODO: IGNITE-17017 Move to common config
+            int pageSize
+    ) throws IgniteInternalCheckedException {
+        this.log = log;
+        this.filePageStoreFileIoFactory = filePageStoreFileIoFactory;
+        this.dbDir = storagePath.resolve("db");

Review Comment:
   We need a common folders structure between all engines. Code like resolve("db") shouldn't be here, what do you think?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -766,4 +766,18 @@ private void checkHeader(ByteBuffer headerBuffer) throws IOException {
                     + ", filePageSize=" + pageSize + "]");
         }
     }
+
+    /**
+     * Returns data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     */
+    byte type() {
+        return type;
+    }
+
+    /**
+     * Returns file page store path.
+     */
+    Path filePath() {

Review Comment:
   Why is this necessary?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.file.Files.createDirectories;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_DATA;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_IDX;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager;
+import org.apache.ignite.internal.util.IgniteStripedLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File page store manager.
+ */
+public class FilePageStoreManager implements IgniteComponent, PageReadWriteManager {
+    /** File suffix. */
+    public static final String FILE_SUFFIX = ".bin";
+
+    /** Partition file prefix. */
+    public static final String PART_FILE_PREFIX = "part-";
+
+    /** Index file prefix. */
+    public static final String INDEX_FILE_PREFIX = "index";
+
+    /** Index file name. */
+    public static final String INDEX_FILE_NAME = INDEX_FILE_PREFIX + FILE_SUFFIX;
+
+    /** Partition file template. */
+    public static final String PART_FILE_TEMPLATE = PART_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Group directory prefix. */
+    public static final String GROUP_DIR_PREFIX = "group-";
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Starting directory for all file page stores, for example: 'db/group-123/index.bin'. */
+    private final Path dbDir;
+
+    /** {@link FileIo} factory for file page store. */
+    private final FileIoFactory filePageStoreFileIoFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Page read write manager. */
+    private final PageReadWriteManagerImpl pageReadWriteManager = new PageReadWriteManagerImpl(this);

Review Comment:
   This field can use an interface as its type I believe



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.file.Files.createDirectories;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_DATA;
+import static org.apache.ignite.internal.pagememory.persistence.store.PageStore.TYPE_IDX;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager;
+import org.apache.ignite.internal.util.IgniteStripedLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File page store manager.
+ */
+public class FilePageStoreManager implements IgniteComponent, PageReadWriteManager {
+    /** File suffix. */
+    public static final String FILE_SUFFIX = ".bin";
+
+    /** Partition file prefix. */
+    public static final String PART_FILE_PREFIX = "part-";
+
+    /** Index file prefix. */
+    public static final String INDEX_FILE_PREFIX = "index";
+
+    /** Index file name. */
+    public static final String INDEX_FILE_NAME = INDEX_FILE_PREFIX + FILE_SUFFIX;
+
+    /** Partition file template. */
+    public static final String PART_FILE_TEMPLATE = PART_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Group directory prefix. */
+    public static final String GROUP_DIR_PREFIX = "group-";
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Starting directory for all file page stores, for example: 'db/group-123/index.bin'. */
+    private final Path dbDir;
+
+    /** {@link FileIo} factory for file page store. */
+    private final FileIoFactory filePageStoreFileIoFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Page read write manager. */
+    private final PageReadWriteManagerImpl pageReadWriteManager = new PageReadWriteManagerImpl(this);
+
+    /**
+     * Executor to disallow running code that modifies data in {@link #groupPageStoreHolders} concurrently with cleanup of file page store.
+     */
+    private final LongOperationAsyncExecutor cleanupAsyncExecutor;
+
+    /** Mapping: group ID -> {@link GroupPageStoreHolder}. */
+    private final GroupPageStoreHolderMap<FilePageStore> groupPageStoreHolders;
+
+    /** Group directory initialization lock. */
+    private final IgniteStripedLock initGroupDirLock = new IgniteStripedLock(Math.max(Runtime.getRuntime().availableProcessors(), 8));
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance.
+     * @param storagePath Storage path.
+     * @param filePageStoreFileIoFactory {@link FileIo} factory for file page store.
+     * @param pageSize Page size in bytes.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public FilePageStoreManager(
+            IgniteLogger log,
+            String igniteInstanceName,
+            Path storagePath,
+            FileIoFactory filePageStoreFileIoFactory,
+            // TODO: IGNITE-17017 Move to common config
+            int pageSize
+    ) throws IgniteInternalCheckedException {
+        this.log = log;
+        this.filePageStoreFileIoFactory = filePageStoreFileIoFactory;
+        this.dbDir = storagePath.resolve("db");
+        this.pageSize = pageSize;
+
+        try {
+            createDirectories(dbDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create work directory for page stores: " + dbDir, e);
+        }
+
+        cleanupAsyncExecutor = new LongOperationAsyncExecutor(igniteInstanceName, log);
+
+        groupPageStoreHolders = new GroupPageStoreHolderMap<>(cleanupAsyncExecutor);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        if (log.isWarnEnabled()) {
+            String tmpDir = System.getProperty("java.io.tmpdir");
+
+            if (tmpDir != null && this.dbDir.startsWith(tmpDir)) {
+                log.warn("Persistence store directory is in the temp directory and may be cleaned. "
+                        + "To avoid this change location of persistence directories. "
+                        + "Current persistence store directory is:" + this.dbDir);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        stopAllGroupFilePageStores(false);
+
+        cleanupAsyncExecutor.awaitAsyncTaskCompletion(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        pageReadWriteManager.read(grpId, pageId, pageBuf, keepCrc);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public FilePageStore write(
+            int grpId,
+            long pageId,
+            ByteBuffer pageBuf,
+            int tag,
+            boolean calculateCrc
+    ) throws IgniteInternalCheckedException {
+        return pageReadWriteManager.write(grpId, pageId, pageBuf, tag, calculateCrc);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
+        return pageReadWriteManager.allocatePage(grpId, partId, flags);
+    }
+
+    /**
+     * Initializing the file page stores for a group.
+     *
+     * @param grpName Group name.
+     * @param grpId Group ID.
+     * @param partitions Partition number, must be greater than {@code 0} and less {@link PageIdAllocator#MAX_PARTITION_ID} + 1.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void initialize(String grpName, int grpId, int partitions) throws IgniteInternalCheckedException {
+        assert partitions > 0 && partitions < MAX_PARTITION_ID + 1 : partitions;
+
+        initGroupDirLock.lock(grpId);
+
+        try {
+            if (!groupPageStoreHolders.containsKey(grpId)) {
+                GroupPageStoreHolder<FilePageStore> holder = createGroupFilePageStoreHolder(grpName, partitions);
+
+                GroupPageStoreHolder<FilePageStore> old = groupPageStoreHolders.put(grpId, holder);
+
+                assert old == null : grpName;
+            }
+        } catch (IgniteInternalCheckedException e) {
+            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+            throw e;
+        } finally {
+            initGroupDirLock.unlock(grpId);
+        }
+    }
+
+    /**
+     * Returns collection of related file page stores for group.
+     *
+     * @param grpId Group ID.
+     */
+    public @Nullable Collection<FilePageStore> getStores(int grpId) {
+        return groupPageStoreHolders.get(grpId);
+    }
+
+    /**
+     * Returns file page store for the corresponding parameters.
+     *
+     * @param grpId Group ID.
+     * @param partId Partition ID, either {@link PageIdAllocator#INDEX_PARTITION} or {@code 0} to {@link PageIdAllocator#MAX_PARTITION_ID}
+     *      (inclusive).
+     * @throws IgniteInternalCheckedException If group or partition with the given ID was not created.
+     */
+    public FilePageStore getStore(int grpId, int partId) throws IgniteInternalCheckedException {
+        assert partId >= 0 && (partId == INDEX_PARTITION || partId <= MAX_PARTITION_ID) : partId;
+
+        GroupPageStoreHolder<FilePageStore> holder = groupPageStoreHolders.get(grpId);
+
+        if (holder == null) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to get file page store for the given group ID (group has not been started): " + grpId
+            );
+        }
+
+        if (partId == INDEX_PARTITION) {
+            return holder.idxStore;
+        }
+
+        if (partId >= holder.partStores.length) {
+            throw new IgniteInternalCheckedException(String.format(
+                    "Failed to get file page store for the given partition ID (partition has not been created) [grpId=%s, partId=%s]",
+                    grpId,
+                    partId
+            ));
+        }
+
+        return holder.partStores[partId];
+    }
+
+    /**
+     * Stops the all group file page stores.
+     *
+     * @param cleanFiles Delete files.
+     */
+    void stopAllGroupFilePageStores(boolean cleanFiles) {
+        List<GroupPageStoreHolder<FilePageStore>> holders = new ArrayList<>(groupPageStoreHolders.size());
+
+        for (Iterator<GroupPageStoreHolder<FilePageStore>> it = groupPageStoreHolders.values().iterator(); it.hasNext(); ) {
+            GroupPageStoreHolder<FilePageStore> holder = it.next();
+
+            it.remove();
+
+            holders.add(holder);
+        }
+
+        Runnable stopPageStores = () -> {
+            try {
+                stopGroupFilePageStores(holders, cleanFiles);
+
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Cleanup cache stores [total=%s, cleanFiles=%s]", holders.size(), cleanFiles));
+                }
+            } catch (Exception e) {
+                log.error("Failed to gracefully stop page store managers", e);
+            }
+        };
+
+        if (cleanFiles) {
+            cleanupAsyncExecutor.async(stopPageStores, "file-page-stores-cleanup");
+        } else {
+            stopPageStores.run();
+        }
+    }
+
+    private static void stopGroupFilePageStores(
+            Collection<GroupPageStoreHolder<FilePageStore>> groupFilePageStoreHolders,
+            boolean cleanFiles
+    ) throws IgniteInternalCheckedException {
+        try {
+            closeAll(groupFilePageStoreHolders.stream().flatMap(Collection::stream).map(pageStore -> () -> pageStore.stop(cleanFiles)));

Review Comment:
   Can you at least split it into several lines of code? It's hard to read



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/LongOperationAsyncExecutor.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Synchronization wrapper for long operations that should be executed asynchronously and operations that can not be executed in parallel
+ * with long operation.
+ *
+ * <p>Uses {@link ReadWriteLock} to provide such synchronization scenario.
+ */
+class LongOperationAsyncExecutor {
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    private final String igniteInstanceName;
+
+    private final IgniteLogger log;
+
+    private final Set<IgniteWorker> workers = ConcurrentHashMap.newKeySet();
+
+    private static final AtomicLong WORKER_COUNTER = new AtomicLong(0);
+
+    /**
+     * Constructor.
+     *
+     * @param igniteInstanceName Name of the Ignite instance this runnable is used in.
+     * @param log Logger.
+     */
+    public LongOperationAsyncExecutor(String igniteInstanceName, IgniteLogger log) {
+        this.igniteInstanceName = igniteInstanceName;
+
+        this.log = log;
+    }
+
+    /**
+     * Executes long operation in dedicated thread.
+     *
+     * <p>Uses write lock as such operations can't run simultaneously.
+     *
+     * @param operation Long operation.
+     * @param name name of the operation, used as part of the thread name.
+     */
+    public void async(Runnable operation, String name) {
+        String workerName = "async-" + name + "-task-" + WORKER_COUNTER.getAndIncrement();
+
+        IgniteWorker worker = new IgniteWorker(log, igniteInstanceName, workerName, null) {
+            /** {@inheritDoc} */
+            @Override
+            protected void body() {
+                readWriteLock.writeLock().lock();
+
+                try {
+                    operation.run();
+                } finally {
+                    readWriteLock.writeLock().unlock();
+
+                    workers.remove(this);
+                }
+            }
+        };
+
+        workers.add(worker);
+
+        new IgniteThread(worker).start();

Review Comment:
   Ok, creating a new thread for every operation - what is this? We should lake a closer look at it afterwards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org