You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/20 19:06:41 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

tkalkirill opened a new pull request, #818:
URL: https://github.com/apache/ignite-3/pull/818

   https://issues.apache.org/jira/browse/IGNITE-17014


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880416717


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/PageIo.java:
##########
@@ -198,7 +198,7 @@ public static int getVersion(long pageAddr) {
      * @param pageAddr Page address.
      * @param ver Version.
      */
-    protected static void setVersion(long pageAddr, int ver) {
+    public static void setVersion(long pageAddr, int ver) {

Review Comment:
   I'm sorry, why does it need to be public?



##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -272,6 +273,22 @@ public static String toHexString(long addr, int len) {
         return sb.toString();
     }
 
+    /**
+     * Returns hex representation of memory region.
+     *
+     * @param buf Buffer which content should be converted to string.
+     */
+    public static String toHexString(ByteBuffer buf) {
+        StringBuilder sb = new StringBuilder(buf.capacity() * 2);
+
+        for (int i = 0; i < buf.capacity(); i++) {
+            // Can not use getLong because on little-endian it produces bs.

Review Comment:
   I don't think that we should use word "bs" in our comments ;)



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.ByteOrder.nativeOrder;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.util.IgniteUtils.hexInt;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.FastCrc;
+import org.apache.ignite.internal.pagememory.persistence.IgniteInternalDataIntegrityViolationException;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * FilePageStore is a {@link PageStore} implementation that uses regular files to store pages.
+ *
+ * <p>Actual read and write operations are performed with {@link FileIo} abstract interface, list of its implementations is a good source
+ * of information about functionality in Ignite Native Persistence.
+ *
+ * <p>On a physical level each instance of FilePageStore corresponds to a partition file assigned to the local node or to index file.
+ *
+ * <p>Consists of:
+ * <ul>
+ *     <li>Header - {@link #SIGNATURE signature} (8 byte) + {@link #version version} (4 byte) + {@link #type type} (1 byte) +
+ *     {@link #pageSize pageSize} (4 bytes) + version-specific information, total length {@link #headerSize}. </li>
+ *     <li>Body - data pages are multiples of {@link #pageSize pageSize}.</li>
+ * </ul>
+ */
+public class FilePageStore implements PageStore {
+    /** Page store file signature. */
+    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
+
+    /** File version. */
+    static final int VERSION = 1;
+
+    /** Size of the common file page store header for all versions, in bytes. */
+    static final int COMMON_HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
+
+    /** Skip CRC calculation flag. */
+    // TODO: IGNITE-17011 Move to config
+    private final boolean skipCrc = getBoolean("IGNITE_PDS_SKIP_CRC");
+
+    /** Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}. */
+    private final byte type;
+
+    /** File page store path. */
+    private final Path filePath;
+
+    /** {@link FileIo} factory. */
+    private final FileIoFactory ioFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Number of allocated bytes. */
+    private final AtomicLong allocatedBytes = new AtomicLong();
+
+    /** List of listeners for current page store to handle. */
+    private final List<PageWriteListener> listeners = new CopyOnWriteArrayList<>();
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    /** Caches the existence state of storage file. After it is initialized, it will be not {@code null} during lifecycle. */
+    private volatile Boolean fileExists;
+
+    /** {@link FileIo} for read/write operations with file. */
+    private volatile FileIo fileIo;
+
+    /** Initialized file page store. */
+    private volatile boolean initialized;
+
+    /**
+     * Constructor.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     * @param filePath File page store path.
+     * @param ioFactory {@link FileIo} factory.
+     * @param pageSize Page size in bytes.
+     */
+    public FilePageStore(
+            byte type,
+            Path filePath,
+            FileIoFactory ioFactory,
+            int pageSize
+    ) {
+        assert type == PageStore.TYPE_DATA || type == PageStore.TYPE_IDX : type;
+
+        this.type = type;
+        this.filePath = filePath;
+        this.ioFactory = ioFactory;
+        this.pageSize = pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addWriteListener(PageWriteListener listener) {
+        listeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeWriteListener(PageWriteListener listener) {
+        listeners.remove(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean clean) throws IgniteInternalCheckedException {
+        try {
+            stop0(clean);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to stop serving partition file [file=" + filePath + ", delete=" + clean + "]",
+                    e
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage() throws IgniteInternalCheckedException {
+        init();
+
+        return allocatedBytes.getAndAdd(pageSize) / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long pages() {
+        if (!initialized) {
+            return 0;
+        }
+
+        return allocatedBytes.get() / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        return read(pageId, pageBuf, !skipCrc, keepCrc);
+    }
+
+    /**
+     * Reads a page from the page store.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param checkCrc Check CRC on page.
+     * @param keepCrc By default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc.
+     * @return {@code true} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    private boolean read(long pageId, ByteBuffer pageBuf, boolean checkCrc, boolean keepCrc) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            long off = pageOffset(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.remaining() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == nativeOrder();
+            assert off <= allocatedBytes.get() : "calculatedOffset=" + off
+                    + ", allocated=" + allocatedBytes.get() + ", headerSize=" + headerSize() + ", filePath=" + filePath;
+
+            int n = readWithFailover(pageBuf, off);
+
+            // If page was not written yet, nothing to read.
+            if (n < 0) {
+                pageBuf.put(new byte[pageBuf.remaining()]);

Review Comment:
   This is interesting, how can that happen? And why do we have to write zeroes



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.ByteOrder.nativeOrder;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.util.IgniteUtils.hexInt;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.FastCrc;
+import org.apache.ignite.internal.pagememory.persistence.IgniteInternalDataIntegrityViolationException;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * FilePageStore is a {@link PageStore} implementation that uses regular files to store pages.
+ *
+ * <p>Actual read and write operations are performed with {@link FileIo} abstract interface, list of its implementations is a good source
+ * of information about functionality in Ignite Native Persistence.
+ *
+ * <p>On a physical level each instance of FilePageStore corresponds to a partition file assigned to the local node or to index file.
+ *
+ * <p>Consists of:
+ * <ul>
+ *     <li>Header - {@link #SIGNATURE signature} (8 byte) + {@link #version version} (4 byte) + {@link #type type} (1 byte) +
+ *     {@link #pageSize pageSize} (4 bytes) + version-specific information, total length {@link #headerSize}. </li>
+ *     <li>Body - data pages are multiples of {@link #pageSize pageSize}.</li>
+ * </ul>
+ */
+public class FilePageStore implements PageStore {
+    /** Page store file signature. */
+    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
+
+    /** File version. */
+    static final int VERSION = 1;
+
+    /** Size of the common file page store header for all versions, in bytes. */
+    static final int COMMON_HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
+
+    /** Skip CRC calculation flag. */
+    // TODO: IGNITE-17011 Move to config
+    private final boolean skipCrc = getBoolean("IGNITE_PDS_SKIP_CRC");
+
+    /** Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}. */
+    private final byte type;
+
+    /** File page store path. */
+    private final Path filePath;
+
+    /** {@link FileIo} factory. */
+    private final FileIoFactory ioFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Number of allocated bytes. */
+    private final AtomicLong allocatedBytes = new AtomicLong();
+
+    /** List of listeners for current page store to handle. */
+    private final List<PageWriteListener> listeners = new CopyOnWriteArrayList<>();
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    /** Caches the existence state of storage file. After it is initialized, it will be not {@code null} during lifecycle. */
+    private volatile Boolean fileExists;
+
+    /** {@link FileIo} for read/write operations with file. */
+    private volatile FileIo fileIo;
+
+    /** Initialized file page store. */
+    private volatile boolean initialized;
+
+    /**
+     * Constructor.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     * @param filePath File page store path.
+     * @param ioFactory {@link FileIo} factory.
+     * @param pageSize Page size in bytes.
+     */
+    public FilePageStore(
+            byte type,
+            Path filePath,
+            FileIoFactory ioFactory,
+            int pageSize
+    ) {
+        assert type == PageStore.TYPE_DATA || type == PageStore.TYPE_IDX : type;
+
+        this.type = type;
+        this.filePath = filePath;
+        this.ioFactory = ioFactory;
+        this.pageSize = pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addWriteListener(PageWriteListener listener) {
+        listeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeWriteListener(PageWriteListener listener) {
+        listeners.remove(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean clean) throws IgniteInternalCheckedException {
+        try {
+            stop0(clean);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to stop serving partition file [file=" + filePath + ", delete=" + clean + "]",
+                    e
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage() throws IgniteInternalCheckedException {
+        init();
+
+        return allocatedBytes.getAndAdd(pageSize) / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long pages() {
+        if (!initialized) {
+            return 0;
+        }
+
+        return allocatedBytes.get() / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        return read(pageId, pageBuf, !skipCrc, keepCrc);
+    }
+
+    /**
+     * Reads a page from the page store.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param checkCrc Check CRC on page.
+     * @param keepCrc By default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc.
+     * @return {@code true} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    private boolean read(long pageId, ByteBuffer pageBuf, boolean checkCrc, boolean keepCrc) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            long off = pageOffset(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.remaining() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == nativeOrder();
+            assert off <= allocatedBytes.get() : "calculatedOffset=" + off
+                    + ", allocated=" + allocatedBytes.get() + ", headerSize=" + headerSize() + ", filePath=" + filePath;
+
+            int n = readWithFailover(pageBuf, off);
+
+            // If page was not written yet, nothing to read.
+            if (n < 0) {
+                pageBuf.put(new byte[pageBuf.remaining()]);
+
+                return false;
+            }
+
+            int savedCrc32 = PageIo.getCrc(pageBuf);
+
+            PageIo.setCrc(pageBuf, 0);
+
+            pageBuf.position(0);
+
+            if (checkCrc) {
+                int curCrc32 = FastCrc.calcCrc(pageBuf, pageSize);
+
+                if ((savedCrc32 ^ curCrc32) != 0) {
+                    throw new IgniteInternalDataIntegrityViolationException("Failed to read page (CRC validation failed) "
+                            + "[id=" + hexLong(pageId) + ", off=" + (off - pageSize)
+                            + ", filePath=" + filePath + ", fileSize=" + fileIo.size()
+                            + ", savedCrc=" + hexInt(savedCrc32) + ", curCrc=" + hexInt(curCrc32)
+                            + ", page=" + toHexString(pageBuf) + "]");
+                }
+            }
+
+            assert PageIo.getCrc(pageBuf) == 0;
+
+            if (keepCrc) {
+                PageIo.setCrc(pageBuf, savedCrc32);
+            }
+
+            return true;
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Failed to read page [file=" + filePath + ", pageId=" + pageId + "]", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteInternalCheckedException {
+        init();
+
+        boolean interrupted = false;
+
+        while (true) {
+            FileIo fileIo = this.fileIo;
+
+            try {
+                readWriteLock.readLock().lock();
+
+                try {
+                    long off = pageOffset(pageId);
+
+                    assert (off >= 0 && off <= allocatedBytes.get()) : "off=" + hexLong(off) + ", allocated="
+                            + hexLong(allocatedBytes.get()) + ", pageId=" + hexLong(pageId) + ", filePath=" + filePath;
+
+                    assert pageBuf.position() == 0;
+                    assert pageBuf.order() == nativeOrder() : "Page buffer order " + pageBuf.order()
+                            + " should be same with " + nativeOrder();
+                    assert PageIo.getType(pageBuf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
+                    assert PageIo.getVersion(pageBuf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);
+
+                    if (calculateCrc && !skipCrc) {
+                        assert PageIo.getCrc(pageBuf) == 0 : hexLong(pageId);
+
+                        PageIo.setCrc(pageBuf, calcCrc32(pageBuf, pageSize));
+                    }
+
+                    // Check whether crc was calculated somewhere above the stack if it is forcibly skipped.
+                    assert skipCrc || PageIo.getCrc(pageBuf) != 0
+                            || calcCrc32(pageBuf, pageSize) == 0 : "CRC hasn't been calculated, crc=0";
+
+                    assert pageBuf.position() == 0 : pageBuf.position();
+
+                    for (PageWriteListener listener : listeners) {
+                        listener.accept(pageId, pageBuf);
+
+                        pageBuf.rewind();
+                    }
+
+                    fileIo.writeFully(pageBuf, off);
+
+                    PageIo.setCrc(pageBuf, 0);
+
+                    if (interrupted) {
+                        Thread.currentThread().interrupt();
+                    }
+
+                    return;
+                } finally {
+                    readWriteLock.readLock().unlock();
+                }
+            } catch (IOException e) {
+                if (e instanceof ClosedChannelException) {
+                    try {
+                        if (e instanceof ClosedByInterruptException) {
+                            interrupted = true;
+
+                            Thread.interrupted();
+                        }
+
+                        reinit(fileIo);
+
+                        pageBuf.position(0);
+
+                        PageIo.setCrc(pageBuf, 0);
+
+                        continue;
+                    } catch (IOException e0) {
+                        e0.addSuppressed(e);
+
+                        e = e0;
+                    }
+                }
+
+                throw new IgniteInternalCheckedException(
+                        "Failed to write page [filePath=" + filePath + ", pageId=" + pageId + ", tag=" + tag + "]",
+                        e
+                );
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void sync() throws IgniteInternalCheckedException {
+        readWriteLock.writeLock().lock();
+
+        try {
+            init();
+
+            FileIo fileIo = this.fileIo;
+
+            if (fileIo != null) {
+                fileIo.force();
+            }
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Failed to fsync partition file [filePath=" + filePath + ']', e);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean exists() {
+        if (fileExists == null) {
+            readWriteLock.writeLock().lock();
+
+            try {
+                if (fileExists == null) {
+                    fileExists = Files.exists(filePath) && filePath.toFile().length() >= headerSize();
+                }
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        }
+
+        return fileExists;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void ensure() throws IgniteInternalCheckedException {
+        init();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws IOException {
+        stop0(false);
+    }
+
+    /**
+     * Returns size of the page store in bytes.
+     *
+     * <p>May differ from {@link #pages} * {@link #pageSize} due to delayed writes or due to other implementation specific details.
+     *
+     * @throws IgniteInternalCheckedException If an I/O error occurs.
+     */
+    public long size() throws IgniteInternalCheckedException {
+        try {
+            FileIo io = fileIo;
+
+            return io == null ? 0 : io.size();
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(e);
+        }
+    }
+
+    /**
+     * Size of file page store header in bytes.
+     */
+    public int headerSize() {
+        return pageSize;
+    }
+
+    /**
+     * File page store version.
+     */
+    public int version() {
+        return VERSION;
+    }
+
+    /**
+     * Reads a page store header with its creation if missing (on store initialization).
+     *
+     * <p>Structure: signature (8 byte) + version (4 byte) + type (1 byte) + pageSize (4 byte).
+     *
+     * @param bufferToWrite Buffer to write to.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void readHeader(ByteBuffer bufferToWrite) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            assert bufferToWrite.remaining() == headerSize();
+
+            readWithFailover(bufferToWrite, 0);
+
+            checkHeader(bufferToWrite);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Failed to read header [filePath=" + filePath + "]", e);
+        }
+    }
+
+    /**
+     * Stops file page store.
+     *
+     * @param delete {@code True} to delete file.
+     * @throws IOException If fails.
+     */
+    private void stop0(boolean delete) throws IOException {
+        readWriteLock.writeLock().lock();
+
+        try {
+            if (!initialized) {
+                // Ensure the file is closed even if not initialized yet.
+                if (fileIo != null) {
+                    fileIo.close();
+                }
+
+                if (delete && exists()) {
+                    Files.delete(filePath);
+                }
+
+                return;
+            }
+
+            fileIo.force();
+
+            fileIo.close();
+
+            fileIo = null;
+
+            if (delete) {
+                Files.delete(filePath);
+
+                fileExists = false;
+            }
+        } finally {
+            initialized = false;
+
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Initializes the page store file.
+     *
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private void init() throws IgniteInternalCheckedException {
+        if (!initialized) {
+            readWriteLock.writeLock().lock();
+
+            try {
+                if (!initialized) {
+                    FileIo fileIo = null;
+
+                    IgniteInternalCheckedException err = null;
+
+                    long newSize;
+
+                    try {
+                        boolean interrupted = false;
+
+                        while (true) {
+                            try {
+                                this.fileIo = fileIo = ioFactory.create(filePath, CREATE, READ, WRITE);
+
+                                fileExists = true;
+
+                                newSize = (filePath.toFile().length() == 0 ? initFile(fileIo) : checkFile(fileIo, filePath));
+
+                                if (interrupted) {
+                                    Thread.currentThread().interrupt();
+                                }
+
+                                break;
+                            } catch (ClosedByInterruptException e) {
+                                interrupted = true;
+
+                                Thread.interrupted();
+                            }
+                        }
+
+                        assert allocatedBytes.get() == 0;
+
+                        allocatedBytes.set(newSize);
+
+                        initialized = true;
+                    } catch (IOException e) {
+                        err = new IgniteInternalCheckedException("Failed to initialize partition file: " + filePath, e);
+
+                        throw err;
+                    } finally {
+                        if (err != null && fileIo != null) {
+                            try {
+                                fileIo.close();
+                            } catch (IOException e) {
+                                err.addSuppressed(e);
+                            }
+                        }
+                    }
+                }
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        }
+    }
+
+    /**
+     * Initializes header and writes it into the file store.
+     *
+     * @return Next available position in the file to store a data.
+     * @throws IOException If initialization is failed.
+     */
+    private long initFile(FileIo fileIo) throws IOException {
+        try {
+            ByteBuffer hdr = header(type, pageSize);
+
+            fileIo.writeFully(hdr);
+
+            // There is 'super' page in every file.
+            return 0;
+        } catch (ClosedByInterruptException e) {
+            // If thread was interrupted written header can be inconsistent.
+            readWriteLock.writeLock().lock();
+
+            try {
+                Files.delete(filePath);
+
+                fileExists = false;
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+
+            throw e;
+        }
+    }
+
+    /**
+     * Creates header for current version file store.
+     *
+     * <p>Doesn't init the store.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     * @param pageSize Page size in bytes.
+     * @return Byte buffer instance.
+     */
+    private ByteBuffer header(byte type, int pageSize) {
+        ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(nativeOrder());
+
+        hdr.putLong(SIGNATURE);
+
+        hdr.putInt(version());
+
+        hdr.put(type);
+
+        hdr.putInt(pageSize);
+
+        hdr.rewind();
+
+        return hdr;
+    }
+
+    /**
+     * Checks that file store has correct header and size.
+     *
+     * @return Next available position in the file to store a data.
+     * @throws IOException If check has failed.
+     */
+    private long checkFile(FileIo fileIo, Path filePath) throws IOException {
+        ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize()).order(nativeOrder());
+
+        fileIo.readFully(headerBuffer);
+
+        checkHeader(headerBuffer);
+
+        long fileSize = filePath.toFile().length();
+
+        // Every file has a special meta page.
+        if (fileSize == headerSize()) {

Review Comment:
   This whole "if" can just be replaces with "fileSize -= headerSize();"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on PR #818:
URL: https://github.com/apache/ignite-3/pull/818#issuecomment-1137109796

   @ibessonov Please make code review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880591790


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/PageIo.java:
##########
@@ -198,7 +198,7 @@ public static int getVersion(long pageAddr) {
      * @param pageAddr Page address.
      * @param ver Version.
      */
-    protected static void setVersion(long pageAddr, int ver) {
+    public static void setVersion(long pageAddr, int ver) {

Review Comment:
   I'll try to return the visibility to the methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on PR #818:
URL: https://github.com/apache/ignite-3/pull/818#issuecomment-1136064982

   Please make code review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r881440472


##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java:
##########
@@ -498,8 +498,7 @@ private static ByteBuffer copyOf(ByteBuffer buffer) {
     private static ByteBuffer createPageByteBuffer() {
         ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(nativeOrder());
 
-        PageIo.setType(bufferAddress(buffer), MAX_IO_TYPE);
-        PageIo.setVersion(bufferAddress(buffer), 1);
+        new TestPageIo().initNewPage(GridUnsafe.bufferAddress(buffer), 0, PAGE_SIZE);

Review Comment:
   Reuse **org.apache.ignite.internal.pagememory.TestPageIoModule.TestPageIo**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880591035


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.ByteOrder.nativeOrder;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.util.IgniteUtils.hexInt;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.FastCrc;
+import org.apache.ignite.internal.pagememory.persistence.IgniteInternalDataIntegrityViolationException;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * FilePageStore is a {@link PageStore} implementation that uses regular files to store pages.
+ *
+ * <p>Actual read and write operations are performed with {@link FileIo} abstract interface, list of its implementations is a good source
+ * of information about functionality in Ignite Native Persistence.
+ *
+ * <p>On a physical level each instance of FilePageStore corresponds to a partition file assigned to the local node or to index file.
+ *
+ * <p>Consists of:
+ * <ul>
+ *     <li>Header - {@link #SIGNATURE signature} (8 byte) + {@link #version version} (4 byte) + {@link #type type} (1 byte) +
+ *     {@link #pageSize pageSize} (4 bytes) + version-specific information, total length {@link #headerSize}. </li>
+ *     <li>Body - data pages are multiples of {@link #pageSize pageSize}.</li>
+ * </ul>
+ */
+public class FilePageStore implements PageStore {
+    /** Page store file signature. */
+    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
+
+    /** File version. */
+    static final int VERSION = 1;
+
+    /** Size of the common file page store header for all versions, in bytes. */
+    static final int COMMON_HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
+
+    /** Skip CRC calculation flag. */
+    // TODO: IGNITE-17011 Move to config
+    private final boolean skipCrc = getBoolean("IGNITE_PDS_SKIP_CRC");
+
+    /** Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}. */
+    private final byte type;
+
+    /** File page store path. */
+    private final Path filePath;
+
+    /** {@link FileIo} factory. */
+    private final FileIoFactory ioFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Number of allocated bytes. */
+    private final AtomicLong allocatedBytes = new AtomicLong();
+
+    /** List of listeners for current page store to handle. */
+    private final List<PageWriteListener> listeners = new CopyOnWriteArrayList<>();
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    /** Caches the existence state of storage file. After it is initialized, it will be not {@code null} during lifecycle. */
+    private volatile Boolean fileExists;
+
+    /** {@link FileIo} for read/write operations with file. */
+    private volatile FileIo fileIo;
+
+    /** Initialized file page store. */
+    private volatile boolean initialized;
+
+    /**
+     * Constructor.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     * @param filePath File page store path.
+     * @param ioFactory {@link FileIo} factory.
+     * @param pageSize Page size in bytes.
+     */
+    public FilePageStore(
+            byte type,
+            Path filePath,
+            FileIoFactory ioFactory,
+            int pageSize
+    ) {
+        assert type == PageStore.TYPE_DATA || type == PageStore.TYPE_IDX : type;
+
+        this.type = type;
+        this.filePath = filePath;
+        this.ioFactory = ioFactory;
+        this.pageSize = pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addWriteListener(PageWriteListener listener) {
+        listeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeWriteListener(PageWriteListener listener) {
+        listeners.remove(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean clean) throws IgniteInternalCheckedException {
+        try {
+            stop0(clean);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to stop serving partition file [file=" + filePath + ", delete=" + clean + "]",
+                    e
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage() throws IgniteInternalCheckedException {
+        init();
+
+        return allocatedBytes.getAndAdd(pageSize) / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long pages() {
+        if (!initialized) {
+            return 0;
+        }
+
+        return allocatedBytes.get() / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        return read(pageId, pageBuf, !skipCrc, keepCrc);
+    }
+
+    /**
+     * Reads a page from the page store.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param checkCrc Check CRC on page.
+     * @param keepCrc By default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc.
+     * @return {@code true} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    private boolean read(long pageId, ByteBuffer pageBuf, boolean checkCrc, boolean keepCrc) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            long off = pageOffset(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.remaining() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == nativeOrder();
+            assert off <= allocatedBytes.get() : "calculatedOffset=" + off
+                    + ", allocated=" + allocatedBytes.get() + ", headerSize=" + headerSize() + ", filePath=" + filePath;
+
+            int n = readWithFailover(pageBuf, off);
+
+            // If page was not written yet, nothing to read.
+            if (n < 0) {
+                pageBuf.put(new byte[pageBuf.remaining()]);

Review Comment:
   In personal correspondence, we decided to leave it as it is, added some documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880560180


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.ByteOrder.nativeOrder;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.util.IgniteUtils.hexInt;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.FastCrc;
+import org.apache.ignite.internal.pagememory.persistence.IgniteInternalDataIntegrityViolationException;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * FilePageStore is a {@link PageStore} implementation that uses regular files to store pages.
+ *
+ * <p>Actual read and write operations are performed with {@link FileIo} abstract interface, list of its implementations is a good source
+ * of information about functionality in Ignite Native Persistence.
+ *
+ * <p>On a physical level each instance of FilePageStore corresponds to a partition file assigned to the local node or to index file.
+ *
+ * <p>Consists of:
+ * <ul>
+ *     <li>Header - {@link #SIGNATURE signature} (8 byte) + {@link #version version} (4 byte) + {@link #type type} (1 byte) +
+ *     {@link #pageSize pageSize} (4 bytes) + version-specific information, total length {@link #headerSize}. </li>
+ *     <li>Body - data pages are multiples of {@link #pageSize pageSize}.</li>
+ * </ul>
+ */
+public class FilePageStore implements PageStore {
+    /** Page store file signature. */
+    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
+
+    /** File version. */
+    static final int VERSION = 1;
+
+    /** Size of the common file page store header for all versions, in bytes. */
+    static final int COMMON_HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
+
+    /** Skip CRC calculation flag. */
+    // TODO: IGNITE-17011 Move to config
+    private final boolean skipCrc = getBoolean("IGNITE_PDS_SKIP_CRC");
+
+    /** Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}. */
+    private final byte type;
+
+    /** File page store path. */
+    private final Path filePath;
+
+    /** {@link FileIo} factory. */
+    private final FileIoFactory ioFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Number of allocated bytes. */
+    private final AtomicLong allocatedBytes = new AtomicLong();
+
+    /** List of listeners for current page store to handle. */
+    private final List<PageWriteListener> listeners = new CopyOnWriteArrayList<>();
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    /** Caches the existence state of storage file. After it is initialized, it will be not {@code null} during lifecycle. */
+    private volatile Boolean fileExists;
+
+    /** {@link FileIo} for read/write operations with file. */
+    private volatile FileIo fileIo;
+
+    /** Initialized file page store. */
+    private volatile boolean initialized;
+
+    /**
+     * Constructor.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     * @param filePath File page store path.
+     * @param ioFactory {@link FileIo} factory.
+     * @param pageSize Page size in bytes.
+     */
+    public FilePageStore(
+            byte type,
+            Path filePath,
+            FileIoFactory ioFactory,
+            int pageSize
+    ) {
+        assert type == PageStore.TYPE_DATA || type == PageStore.TYPE_IDX : type;
+
+        this.type = type;
+        this.filePath = filePath;
+        this.ioFactory = ioFactory;
+        this.pageSize = pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addWriteListener(PageWriteListener listener) {
+        listeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeWriteListener(PageWriteListener listener) {
+        listeners.remove(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean clean) throws IgniteInternalCheckedException {
+        try {
+            stop0(clean);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to stop serving partition file [file=" + filePath + ", delete=" + clean + "]",
+                    e
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage() throws IgniteInternalCheckedException {
+        init();
+
+        return allocatedBytes.getAndAdd(pageSize) / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long pages() {
+        if (!initialized) {
+            return 0;
+        }
+
+        return allocatedBytes.get() / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        return read(pageId, pageBuf, !skipCrc, keepCrc);
+    }
+
+    /**
+     * Reads a page from the page store.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param checkCrc Check CRC on page.
+     * @param keepCrc By default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc.
+     * @return {@code true} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    private boolean read(long pageId, ByteBuffer pageBuf, boolean checkCrc, boolean keepCrc) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            long off = pageOffset(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.remaining() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == nativeOrder();
+            assert off <= allocatedBytes.get() : "calculatedOffset=" + off
+                    + ", allocated=" + allocatedBytes.get() + ", headerSize=" + headerSize() + ", filePath=" + filePath;
+
+            int n = readWithFailover(pageBuf, off);
+
+            // If page was not written yet, nothing to read.
+            if (n < 0) {
+                pageBuf.put(new byte[pageBuf.remaining()]);
+
+                return false;
+            }
+
+            int savedCrc32 = PageIo.getCrc(pageBuf);
+
+            PageIo.setCrc(pageBuf, 0);
+
+            pageBuf.position(0);
+
+            if (checkCrc) {
+                int curCrc32 = FastCrc.calcCrc(pageBuf, pageSize);
+
+                if ((savedCrc32 ^ curCrc32) != 0) {
+                    throw new IgniteInternalDataIntegrityViolationException("Failed to read page (CRC validation failed) "
+                            + "[id=" + hexLong(pageId) + ", off=" + (off - pageSize)
+                            + ", filePath=" + filePath + ", fileSize=" + fileIo.size()
+                            + ", savedCrc=" + hexInt(savedCrc32) + ", curCrc=" + hexInt(curCrc32)
+                            + ", page=" + toHexString(pageBuf) + "]");
+                }
+            }
+
+            assert PageIo.getCrc(pageBuf) == 0;
+
+            if (keepCrc) {
+                PageIo.setCrc(pageBuf, savedCrc32);
+            }
+
+            return true;
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Failed to read page [file=" + filePath + ", pageId=" + pageId + "]", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteInternalCheckedException {
+        init();
+
+        boolean interrupted = false;
+
+        while (true) {
+            FileIo fileIo = this.fileIo;
+
+            try {
+                readWriteLock.readLock().lock();
+
+                try {
+                    long off = pageOffset(pageId);
+
+                    assert (off >= 0 && off <= allocatedBytes.get()) : "off=" + hexLong(off) + ", allocated="
+                            + hexLong(allocatedBytes.get()) + ", pageId=" + hexLong(pageId) + ", filePath=" + filePath;
+
+                    assert pageBuf.position() == 0;
+                    assert pageBuf.order() == nativeOrder() : "Page buffer order " + pageBuf.order()
+                            + " should be same with " + nativeOrder();
+                    assert PageIo.getType(pageBuf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
+                    assert PageIo.getVersion(pageBuf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);
+
+                    if (calculateCrc && !skipCrc) {
+                        assert PageIo.getCrc(pageBuf) == 0 : hexLong(pageId);
+
+                        PageIo.setCrc(pageBuf, calcCrc32(pageBuf, pageSize));
+                    }
+
+                    // Check whether crc was calculated somewhere above the stack if it is forcibly skipped.
+                    assert skipCrc || PageIo.getCrc(pageBuf) != 0
+                            || calcCrc32(pageBuf, pageSize) == 0 : "CRC hasn't been calculated, crc=0";
+
+                    assert pageBuf.position() == 0 : pageBuf.position();
+
+                    for (PageWriteListener listener : listeners) {
+                        listener.accept(pageId, pageBuf);
+
+                        pageBuf.rewind();
+                    }
+
+                    fileIo.writeFully(pageBuf, off);
+
+                    PageIo.setCrc(pageBuf, 0);
+
+                    if (interrupted) {
+                        Thread.currentThread().interrupt();
+                    }
+
+                    return;
+                } finally {
+                    readWriteLock.readLock().unlock();
+                }
+            } catch (IOException e) {
+                if (e instanceof ClosedChannelException) {
+                    try {
+                        if (e instanceof ClosedByInterruptException) {
+                            interrupted = true;
+
+                            Thread.interrupted();
+                        }
+
+                        reinit(fileIo);
+
+                        pageBuf.position(0);
+
+                        PageIo.setCrc(pageBuf, 0);
+
+                        continue;
+                    } catch (IOException e0) {
+                        e0.addSuppressed(e);
+
+                        e = e0;
+                    }
+                }
+
+                throw new IgniteInternalCheckedException(
+                        "Failed to write page [filePath=" + filePath + ", pageId=" + pageId + ", tag=" + tag + "]",
+                        e
+                );
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void sync() throws IgniteInternalCheckedException {
+        readWriteLock.writeLock().lock();
+
+        try {
+            init();
+
+            FileIo fileIo = this.fileIo;
+
+            if (fileIo != null) {
+                fileIo.force();
+            }
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Failed to fsync partition file [filePath=" + filePath + ']', e);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean exists() {
+        if (fileExists == null) {
+            readWriteLock.writeLock().lock();
+
+            try {
+                if (fileExists == null) {
+                    fileExists = Files.exists(filePath) && filePath.toFile().length() >= headerSize();
+                }
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        }
+
+        return fileExists;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void ensure() throws IgniteInternalCheckedException {
+        init();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() throws IOException {
+        stop0(false);
+    }
+
+    /**
+     * Returns size of the page store in bytes.
+     *
+     * <p>May differ from {@link #pages} * {@link #pageSize} due to delayed writes or due to other implementation specific details.
+     *
+     * @throws IgniteInternalCheckedException If an I/O error occurs.
+     */
+    public long size() throws IgniteInternalCheckedException {
+        try {
+            FileIo io = fileIo;
+
+            return io == null ? 0 : io.size();
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(e);
+        }
+    }
+
+    /**
+     * Size of file page store header in bytes.
+     */
+    public int headerSize() {
+        return pageSize;
+    }
+
+    /**
+     * File page store version.
+     */
+    public int version() {
+        return VERSION;
+    }
+
+    /**
+     * Reads a page store header with its creation if missing (on store initialization).
+     *
+     * <p>Structure: signature (8 byte) + version (4 byte) + type (1 byte) + pageSize (4 byte).
+     *
+     * @param bufferToWrite Buffer to write to.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void readHeader(ByteBuffer bufferToWrite) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            assert bufferToWrite.remaining() == headerSize();
+
+            readWithFailover(bufferToWrite, 0);
+
+            checkHeader(bufferToWrite);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Failed to read header [filePath=" + filePath + "]", e);
+        }
+    }
+
+    /**
+     * Stops file page store.
+     *
+     * @param delete {@code True} to delete file.
+     * @throws IOException If fails.
+     */
+    private void stop0(boolean delete) throws IOException {
+        readWriteLock.writeLock().lock();
+
+        try {
+            if (!initialized) {
+                // Ensure the file is closed even if not initialized yet.
+                if (fileIo != null) {
+                    fileIo.close();
+                }
+
+                if (delete && exists()) {
+                    Files.delete(filePath);
+                }
+
+                return;
+            }
+
+            fileIo.force();
+
+            fileIo.close();
+
+            fileIo = null;
+
+            if (delete) {
+                Files.delete(filePath);
+
+                fileExists = false;
+            }
+        } finally {
+            initialized = false;
+
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Initializes the page store file.
+     *
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    private void init() throws IgniteInternalCheckedException {
+        if (!initialized) {
+            readWriteLock.writeLock().lock();
+
+            try {
+                if (!initialized) {
+                    FileIo fileIo = null;
+
+                    IgniteInternalCheckedException err = null;
+
+                    long newSize;
+
+                    try {
+                        boolean interrupted = false;
+
+                        while (true) {
+                            try {
+                                this.fileIo = fileIo = ioFactory.create(filePath, CREATE, READ, WRITE);
+
+                                fileExists = true;
+
+                                newSize = (filePath.toFile().length() == 0 ? initFile(fileIo) : checkFile(fileIo, filePath));
+
+                                if (interrupted) {
+                                    Thread.currentThread().interrupt();
+                                }
+
+                                break;
+                            } catch (ClosedByInterruptException e) {
+                                interrupted = true;
+
+                                Thread.interrupted();
+                            }
+                        }
+
+                        assert allocatedBytes.get() == 0;
+
+                        allocatedBytes.set(newSize);
+
+                        initialized = true;
+                    } catch (IOException e) {
+                        err = new IgniteInternalCheckedException("Failed to initialize partition file: " + filePath, e);
+
+                        throw err;
+                    } finally {
+                        if (err != null && fileIo != null) {
+                            try {
+                                fileIo.close();
+                            } catch (IOException e) {
+                                err.addSuppressed(e);
+                            }
+                        }
+                    }
+                }
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        }
+    }
+
+    /**
+     * Initializes header and writes it into the file store.
+     *
+     * @return Next available position in the file to store a data.
+     * @throws IOException If initialization is failed.
+     */
+    private long initFile(FileIo fileIo) throws IOException {
+        try {
+            ByteBuffer hdr = header(type, pageSize);
+
+            fileIo.writeFully(hdr);
+
+            // There is 'super' page in every file.
+            return 0;
+        } catch (ClosedByInterruptException e) {
+            // If thread was interrupted written header can be inconsistent.
+            readWriteLock.writeLock().lock();
+
+            try {
+                Files.delete(filePath);
+
+                fileExists = false;
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+
+            throw e;
+        }
+    }
+
+    /**
+     * Creates header for current version file store.
+     *
+     * <p>Doesn't init the store.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     * @param pageSize Page size in bytes.
+     * @return Byte buffer instance.
+     */
+    private ByteBuffer header(byte type, int pageSize) {
+        ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(nativeOrder());
+
+        hdr.putLong(SIGNATURE);
+
+        hdr.putInt(version());
+
+        hdr.put(type);
+
+        hdr.putInt(pageSize);
+
+        hdr.rewind();
+
+        return hdr;
+    }
+
+    /**
+     * Checks that file store has correct header and size.
+     *
+     * @return Next available position in the file to store a data.
+     * @throws IOException If check has failed.
+     */
+    private long checkFile(FileIo fileIo, Path filePath) throws IOException {
+        ByteBuffer headerBuffer = ByteBuffer.allocate(headerSize()).order(nativeOrder());
+
+        fileIo.readFully(headerBuffer);
+
+        checkHeader(headerBuffer);
+
+        long fileSize = filePath.toFile().length();
+
+        // Every file has a special meta page.
+        if (fileSize == headerSize()) {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r881453707


##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -265,13 +266,29 @@ public static String toHexString(long addr, int len) {
         StringBuilder sb = new StringBuilder(len * 2);
 
         for (int i = 0; i < len; i++) {
-            // Can not use getLong because on little-endian it produces bs.
+            // Can not use getLong because on little-endian it produces wrong result.
             addByteAsHex(sb, GridUnsafe.getByte(addr + i));
         }
 
         return sb.toString();
     }
 
+    /**
+     * Returns hex representation of memory region.
+     *
+     * @param buf Buffer which content should be converted to string.
+     */
+    public static String toHexString(ByteBuffer buf) {
+        StringBuilder sb = new StringBuilder(buf.capacity() * 2);
+
+        for (int i = 0; i < buf.capacity(); i++) {

Review Comment:
   You're right, corrected and expanded the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r881421854


##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -265,13 +266,29 @@ public static String toHexString(long addr, int len) {
         StringBuilder sb = new StringBuilder(len * 2);
 
         for (int i = 0; i < len; i++) {
-            // Can not use getLong because on little-endian it produces bs.
+            // Can not use getLong because on little-endian it produces wrong result.
             addByteAsHex(sb, GridUnsafe.getByte(addr + i));
         }
 
         return sb.toString();
     }
 
+    /**
+     * Returns hex representation of memory region.
+     *
+     * @param buf Buffer which content should be converted to string.
+     */
+    public static String toHexString(ByteBuffer buf) {
+        StringBuilder sb = new StringBuilder(buf.capacity() * 2);
+
+        for (int i = 0; i < buf.capacity(); i++) {

Review Comment:
   Should't you start with position and end with limit? Please reflect in javadoc that position and limit are ignored and the whole data chunk is converted to string



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java:
##########
@@ -498,8 +498,7 @@ private static ByteBuffer copyOf(ByteBuffer buffer) {
     private static ByteBuffer createPageByteBuffer() {
         ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(nativeOrder());
 
-        PageIo.setType(bufferAddress(buffer), MAX_IO_TYPE);
-        PageIo.setVersion(bufferAddress(buffer), 1);
+        new TestPageIo().initNewPage(GridUnsafe.bufferAddress(buffer), 0, PAGE_SIZE);

Review Comment:
   I bet we already have a test io somewhere nearby, can we reuse it? Or they're all private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880571609


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/io/PageIo.java:
##########
@@ -198,7 +198,7 @@ public static int getVersion(long pageAddr) {
      * @param pageAddr Page address.
      * @param ver Version.
      */
-    protected static void setVersion(long pageAddr, int ver) {
+    public static void setVersion(long pageAddr, int ver) {

Review Comment:
   For **FilePageStoreTest#createPageByteBuffer**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880567413


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.ByteOrder.nativeOrder;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.util.IgniteUtils.hexInt;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.FastCrc;
+import org.apache.ignite.internal.pagememory.persistence.IgniteInternalDataIntegrityViolationException;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * FilePageStore is a {@link PageStore} implementation that uses regular files to store pages.
+ *
+ * <p>Actual read and write operations are performed with {@link FileIo} abstract interface, list of its implementations is a good source
+ * of information about functionality in Ignite Native Persistence.
+ *
+ * <p>On a physical level each instance of FilePageStore corresponds to a partition file assigned to the local node or to index file.
+ *
+ * <p>Consists of:
+ * <ul>
+ *     <li>Header - {@link #SIGNATURE signature} (8 byte) + {@link #version version} (4 byte) + {@link #type type} (1 byte) +
+ *     {@link #pageSize pageSize} (4 bytes) + version-specific information, total length {@link #headerSize}. </li>
+ *     <li>Body - data pages are multiples of {@link #pageSize pageSize}.</li>
+ * </ul>
+ */
+public class FilePageStore implements PageStore {
+    /** Page store file signature. */
+    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
+
+    /** File version. */
+    static final int VERSION = 1;
+
+    /** Size of the common file page store header for all versions, in bytes. */
+    static final int COMMON_HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 1/*type*/ + 4/*page size*/;
+
+    /** Skip CRC calculation flag. */
+    // TODO: IGNITE-17011 Move to config
+    private final boolean skipCrc = getBoolean("IGNITE_PDS_SKIP_CRC");
+
+    /** Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}. */
+    private final byte type;
+
+    /** File page store path. */
+    private final Path filePath;
+
+    /** {@link FileIo} factory. */
+    private final FileIoFactory ioFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Number of allocated bytes. */
+    private final AtomicLong allocatedBytes = new AtomicLong();
+
+    /** List of listeners for current page store to handle. */
+    private final List<PageWriteListener> listeners = new CopyOnWriteArrayList<>();
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    /** Caches the existence state of storage file. After it is initialized, it will be not {@code null} during lifecycle. */
+    private volatile Boolean fileExists;
+
+    /** {@link FileIo} for read/write operations with file. */
+    private volatile FileIo fileIo;
+
+    /** Initialized file page store. */
+    private volatile boolean initialized;
+
+    /**
+     * Constructor.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link PageStore#TYPE_DATA}.
+     * @param filePath File page store path.
+     * @param ioFactory {@link FileIo} factory.
+     * @param pageSize Page size in bytes.
+     */
+    public FilePageStore(
+            byte type,
+            Path filePath,
+            FileIoFactory ioFactory,
+            int pageSize
+    ) {
+        assert type == PageStore.TYPE_DATA || type == PageStore.TYPE_IDX : type;
+
+        this.type = type;
+        this.filePath = filePath;
+        this.ioFactory = ioFactory;
+        this.pageSize = pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addWriteListener(PageWriteListener listener) {
+        listeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeWriteListener(PageWriteListener listener) {
+        listeners.remove(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean clean) throws IgniteInternalCheckedException {
+        try {
+            stop0(clean);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to stop serving partition file [file=" + filePath + ", delete=" + clean + "]",
+                    e
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage() throws IgniteInternalCheckedException {
+        init();
+
+        return allocatedBytes.getAndAdd(pageSize) / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long pages() {
+        if (!initialized) {
+            return 0;
+        }
+
+        return allocatedBytes.get() / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        return read(pageId, pageBuf, !skipCrc, keepCrc);
+    }
+
+    /**
+     * Reads a page from the page store.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param checkCrc Check CRC on page.
+     * @param keepCrc By default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc.
+     * @return {@code true} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    private boolean read(long pageId, ByteBuffer pageBuf, boolean checkCrc, boolean keepCrc) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            long off = pageOffset(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.remaining() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == nativeOrder();
+            assert off <= allocatedBytes.get() : "calculatedOffset=" + off
+                    + ", allocated=" + allocatedBytes.get() + ", headerSize=" + headerSize() + ", filePath=" + filePath;
+
+            int n = readWithFailover(pageBuf, off);
+
+            // If page was not written yet, nothing to read.
+            if (n < 0) {
+                pageBuf.put(new byte[pageBuf.remaining()]);

Review Comment:
   -1 if the position is greater than or equal to the file's current size.
   Removed nulling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880557778


##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -272,6 +273,22 @@ public static String toHexString(long addr, int len) {
         return sb.toString();
     }
 
+    /**
+     * Returns hex representation of memory region.
+     *
+     * @param buf Buffer which content should be converted to string.
+     */
+    public static String toHexString(ByteBuffer buf) {
+        StringBuilder sb = new StringBuilder(buf.capacity() * 2);
+
+        for (int i = 0; i < buf.capacity(); i++) {
+            // Can not use getLong because on little-endian it produces bs.

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov merged pull request #818: IGNITE-17014 [Native Persistence 3.0] Porting FilePageStoreFactory and FilePageStore from 2.0

Posted by GitBox <gi...@apache.org>.
ibessonov merged PR #818:
URL: https://github.com/apache/ignite-3/pull/818


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org