You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2018/11/26 12:38:28 UTC

[4/6] ignite git commit: IGNITE-10330: Disk page compression. - Fixes #5200.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/licenses/lz4-LICENSE.txt
----------------------------------------------------------------------
diff --git a/modules/compress/licenses/lz4-LICENSE.txt b/modules/compress/licenses/lz4-LICENSE.txt
new file mode 100644
index 0000000..c221aeb
--- /dev/null
+++ b/modules/compress/licenses/lz4-LICENSE.txt
@@ -0,0 +1,11 @@
+This repository uses 2 different licenses :
+- all files in the `lib` directory use a BSD 2-Clause license
+- all other files use a GPLv2 license, unless explicitly stated otherwise
+
+Relevant license is reminded at the top of each source file,
+and with presence of COPYING or LICENSE file in associated directories.
+
+This model is selected to emphasize that
+files in the `lib` directory are designed to be included into 3rd party applications,
+while all other files, in `programs`, `tests` or `examples`,
+receive more limited attention and support for such scenario.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/licenses/lz4-java-LICENSE.txt
----------------------------------------------------------------------
diff --git a/modules/compress/licenses/lz4-java-LICENSE.txt b/modules/compress/licenses/lz4-java-LICENSE.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/compress/licenses/lz4-java-LICENSE.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/pom.xml
----------------------------------------------------------------------
diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml
new file mode 100644
index 0000000..876a121
--- /dev/null
+++ b/modules/compress/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-compress</artifactId>
+    <version>2.7.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.jnr</groupId>
+            <artifactId>jnr-posix</artifactId>
+            <version>${jnr.posix.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+            <version>${zstd.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+            <version>${lz4.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thoughtworks.xstream</groupId>
+            <artifactId>xstream</artifactId>
+            <version>1.4.8</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
new file mode 100644
index 0000000..2553371
--- /dev/null
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import com.github.luben.zstd.Zstd;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.xerial.snappy.Snappy;
+
+import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
+
+/**
+ * Compression processor.
+ */
+public class CompressionProcessorImpl extends CompressionProcessor {
+    /** Max page size. */
+    private final ThreadLocalByteBuffer compactBuf = new ThreadLocalByteBuffer(MAX_PAGE_SIZE);
+
+    /** A bit more than max page size. */
+    private final ThreadLocalByteBuffer compressBuf = new ThreadLocalByteBuffer(MAX_PAGE_SIZE + 1024);
+
+    /**
+     * @param ctx Kernal context.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public CompressionProcessorImpl(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /**
+     * @param cap Capacity.
+     * @return Direct byte buffer.
+     */
+    static ByteBuffer allocateDirectBuffer(int cap) {
+        return ByteBuffer.allocateDirect(cap).order(NATIVE_BYTE_ORDER);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException {
+        if (!U.isLinux())
+            throw new IgniteCheckedException("Currently page compression is supported only for Linux.");
+
+        FileSystemUtils.checkSupported();
+
+        int fsBlockSize = FileSystemUtils.getFileSystemBlockSize(storagePath);
+
+        if (fsBlockSize <= 0)
+            throw new IgniteCheckedException("Failed to get file system block size: " + storagePath);
+
+        if (!U.isPow2(fsBlockSize))
+            throw new IgniteCheckedException("Storage block size must be power of 2: " + fsBlockSize);
+
+        if (pageSize < fsBlockSize * 2) {
+            throw new IgniteCheckedException("Page size (now configured to " + pageSize + " bytes) " +
+                "must be at least 2 times larger than the underlying storage block size (detected to be " + fsBlockSize +
+                " bytes at '" + storagePath + "') for page compression.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer compressPage(
+        ByteBuffer page,
+        int pageSize,
+        int blockSize,
+        DiskPageCompression compression,
+        int compressLevel
+    ) throws IgniteCheckedException {
+        assert compression != null;
+        assert U.isPow2(pageSize): pageSize;
+        assert U.isPow2(blockSize): blockSize;
+        assert page.position() == 0 && page.limit() == pageSize;
+
+        PageIO io = PageIO.getPageIO(page);
+
+        if (!(io instanceof CompactablePageIO))
+            return page;
+
+        ByteBuffer compactPage = compactBuf.get();
+
+        // Drop the garbage from the page.
+        ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
+        page.clear();
+
+        int compactSize = compactPage.limit();
+
+        assert compactSize <= pageSize: compactSize;
+
+        // If no need to compress further or configured just to skip garbage.
+        if (compactSize < blockSize || compression == SKIP_GARBAGE)
+            return setCompactionInfo(compactPage, compactSize);
+
+        ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);
+
+        assert compressedPage.position() == 0;
+        int compressedSize = compressedPage.limit();
+
+        int freeCompactBlocks = (pageSize - compactSize) / blockSize;
+        int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+
+        if (freeCompactBlocks >= freeCompressedBlocks) {
+            if (freeCompactBlocks == 0)
+                return page; // No blocks will be released.
+
+            return setCompactionInfo(compactPage, compactSize);
+        }
+
+        return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
+    }
+
+    /**
+     * @param page Page.
+     * @param compactSize Compacted page size.
+     * @return The given page.
+     */
+    private static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) {
+        return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize);
+    }
+
+    /**
+     * @param page Page.
+     * @param compression Compression algorithm.
+     * @param compressedSize Compressed size.
+     * @param compactedSize Compact size.
+     * @return The given page.
+     */
+    private static ByteBuffer setCompressionInfo(ByteBuffer page, DiskPageCompression compression, int compressedSize, int compactedSize) {
+        assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE: compressedSize;
+        assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE: compactedSize;
+
+        PageIO.setCompressionType(page, getCompressionType(compression));
+        PageIO.setCompressedSize(page, (short)compressedSize);
+        PageIO.setCompactedSize(page, (short)compactedSize);
+
+        return page;
+    }
+
+    /**
+     * @param compression Compression algorithm.
+     * @param compactPage Compacted page.
+     * @param compactSize Compacted page size.
+     * @param compressLevel Compression level.
+     * @return Compressed page.
+     */
+    private ByteBuffer doCompressPage(DiskPageCompression compression, ByteBuffer compactPage, int compactSize, int compressLevel) {
+        switch (compression) {
+            case ZSTD:
+                return compressPageZstd(compactPage, compactSize, compressLevel);
+
+            case LZ4:
+                return compressPageLz4(compactPage, compactSize, compressLevel);
+
+            case SNAPPY:
+                return compressPageSnappy(compactPage, compactSize);
+        }
+        throw new IllegalStateException("Unsupported compression: " + compression);
+    }
+
+    /**
+     * @param compactPage Compacted page.
+     * @param compactSize Compacted page size.
+     * @param compressLevel Compression level.
+     * @return Compressed page.
+     */
+    private ByteBuffer compressPageLz4(ByteBuffer compactPage, int compactSize, int compressLevel) {
+        LZ4Compressor compressor = Lz4.getCompressor(compressLevel);
+
+        ByteBuffer compressedPage = compressBuf.get();
+
+        copyPageHeader(compactPage, compressedPage, compactSize);
+        compressor.compress(compactPage, compressedPage);
+
+        compactPage.flip();
+        compressedPage.flip();
+
+        return compressedPage;
+    }
+
+    /**
+     * @param compactPage Compacted page.
+     * @param compactSize Compacted page size.
+     * @param compressLevel Compression level.
+     * @return Compressed page.
+     */
+    private ByteBuffer compressPageZstd(ByteBuffer compactPage, int compactSize, int compressLevel) {
+        ByteBuffer compressedPage = compressBuf.get();
+
+        copyPageHeader(compactPage, compressedPage, compactSize);
+        Zstd.compress(compressedPage, compactPage, compressLevel);
+
+        compactPage.flip();
+        compressedPage.flip();
+
+        return compressedPage;
+    }
+
+    /**
+     * @param compactPage Compacted page.
+     * @param compactSize Compacted page size.
+     * @return Compressed page.
+     */
+    private ByteBuffer compressPageSnappy(ByteBuffer compactPage, int compactSize) {
+        ByteBuffer compressedPage = compressBuf.get();
+
+        copyPageHeader(compactPage, compressedPage, compactSize);
+
+        try {
+            int compressedSize = Snappy.compress(compactPage, compressedPage);
+            assert compressedPage.limit() == PageIO.COMMON_HEADER_END + compressedSize;
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to compress page with Snappy.", e);
+        }
+
+        compactPage.position(0);
+        compressedPage.position(0);
+
+        return compressedPage;
+    }
+
+    /**
+     * @param compactPage Compacted page.
+     * @param compressedPage Compressed page.
+     * @param compactSize Compacted page size.
+     */
+    private static void copyPageHeader(ByteBuffer compactPage, ByteBuffer compressedPage, int compactSize) {
+        compactPage.limit(PageIO.COMMON_HEADER_END);
+        compressedPage.put(compactPage);
+        compactPage.limit(compactSize);
+    }
+
+    /**
+     * @param compression Compression.
+     * @return Level.
+     */
+    private static byte getCompressionType(DiskPageCompression compression) {
+        if (compression == null)
+            return UNCOMPRESSED_PAGE;
+
+        switch (compression) {
+            case ZSTD:
+                return ZSTD_COMPRESSED_PAGE;
+
+            case LZ4:
+                return LZ4_COMPRESSED_PAGE;
+
+            case SNAPPY:
+                return SNAPPY_COMPRESSED_PAGE;
+
+            case SKIP_GARBAGE:
+                return COMPACTED_PAGE;
+        }
+        throw new IllegalStateException("Unexpected compression: " + compression);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
+        assert page.capacity() == pageSize;
+
+        byte compressType = PageIO.getCompressionType(page);
+
+        if (compressType == UNCOMPRESSED_PAGE)
+            return; // Nothing to do.
+
+        short compressedSize = PageIO.getCompressedSize(page);
+        short compactSize = PageIO.getCompactedSize(page);
+
+        assert compactSize <= pageSize && compactSize >= compressedSize;
+
+        if (compressType == COMPACTED_PAGE) {
+            // Just setup bounds before restoring the page.
+            page.position(0).limit(compactSize);
+        }
+        else {
+            ByteBuffer dst = compressBuf.get();
+
+            // Position on a part that needs to be decompressed.
+            page.limit(compressedSize)
+                .position(PageIO.COMMON_HEADER_END);
+
+            // LZ4 needs this limit to be exact.
+            dst.limit(compactSize - PageIO.COMMON_HEADER_END);
+
+            switch (compressType) {
+                case ZSTD_COMPRESSED_PAGE:
+                    Zstd.decompress(dst, page);
+                    dst.flip();
+
+                    break;
+
+                case LZ4_COMPRESSED_PAGE:
+                    Lz4.decompress(page, dst);
+                    dst.flip();
+
+                    break;
+
+                case SNAPPY_COMPRESSED_PAGE:
+                    try {
+                        Snappy.uncompress(page, dst);
+                    }
+                    catch (IOException e) {
+                        throw new IgniteException(e);
+                    }
+                    break;
+
+                default:
+                    throw new IgniteException("Unknown compression: " + compressType);
+            }
+
+            page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
+            page.put(dst).flip();
+            assert page.limit() == compactSize;
+        }
+
+        CompactablePageIO io = PageIO.getPageIO(page);
+
+        io.restorePage(page, pageSize);
+
+        setCompressionInfo(page, null, 0, 0);
+    }
+
+    /** */
+    static class Lz4 {
+        /** */
+        static final LZ4Factory factory = LZ4Factory.fastestInstance();
+
+        /** */
+        static final LZ4FastDecompressor decompressor = factory.fastDecompressor();
+
+        /** */
+        static final LZ4Compressor fastCompressor = factory.fastCompressor();
+
+        /**
+         * @param level Compression level.
+         * @return Compressor.
+         */
+        static LZ4Compressor getCompressor(int level) {
+            assert level >= 0 && level <= 17: level;
+            return level == 0 ? fastCompressor : factory.highCompressor(level);
+        }
+
+        /**
+         * @param page Page.
+         * @param dst Destination buffer.
+         */
+        static void decompress(ByteBuffer page, ByteBuffer dst) {
+            decompressor.decompress(page, dst);
+        }
+    }
+
+    /**
+     */
+    static final class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
+        /** */
+        final int size;
+
+        /**
+         * @param size Size.
+         */
+        ThreadLocalByteBuffer(int size) {
+            this.size = size;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected ByteBuffer initialValue() {
+            return allocateDirectBuffer(size);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer get() {
+            ByteBuffer buf = super.get();
+            buf.clear();
+            return buf;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java
new file mode 100644
index 0000000..22d4926
--- /dev/null
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import jnr.ffi.LibraryLoader;
+import org.apache.ignite.IgniteException;
+
+/**
+ * Linux native file system API.
+ */
+public final class NativeFileSystemLinux extends NativeFileSystemPosix {
+    /**
+     * default is extend size
+     */
+    public static final int FALLOC_FL_KEEP_SIZE = 0x01;
+
+    /**
+     * de-allocates range
+     */
+    public static final int FALLOC_FL_PUNCH_HOLE = 0x02;
+
+    /**
+     * reserved codepoint
+     */
+    public static final int FALLOC_FL_NO_HIDE_STALE = 0x04;
+
+    /**
+     * FALLOC_FL_COLLAPSE_RANGE is used to remove a range of a file
+     * without leaving a hole in the file. The contents of the file beyond
+     * the range being removed is appended to the start offset of the range
+     * being removed (i.e. the hole that was punched is "collapsed"),
+     * resulting in a file layout that looks like the range that was
+     * removed never existed. As such collapsing a range of a file changes
+     * the size of the file, reducing it by the same length of the range
+     * that has been removed by the operation.
+     *
+     * Different filesystems may implement different limitations on the
+     * granularity of the operation. Most will limit operations to
+     * filesystem block size boundaries, but this boundary may be larger or
+     * smaller depending on the filesystem and/or the configuration of the
+     * filesystem or file.
+     *
+     * Attempting to collapse a range that crosses the end of the file is
+     * considered an illegal operation - just use ftruncate(2) if you need
+     * to collapse a range that crosses EOF.
+     */
+    public static final int FALLOC_FL_COLLAPSE_RANGE = 0x08;
+
+    /**
+     * FALLOC_FL_ZERO_RANGE is used to convert a range of file to zeros preferably
+     * without issuing data IO. Blocks should be preallocated for the regions that
+     * span holes in the file, and the entire range is preferable converted to
+     * unwritten extents - even though file system may choose to zero out the
+     * extent or do whatever which will result in reading zeros from the range
+     * while the range remains allocated for the file.
+     *
+     * This can be also used to preallocate blocks past EOF in the same way as
+     * with fallocate. Flag FALLOC_FL_KEEP_SIZE should cause the inode
+     * size to remain the same.
+     */
+    public static final int FALLOC_FL_ZERO_RANGE = 0x10;
+
+    /**
+     * FALLOC_FL_INSERT_RANGE is use to insert space within the file size without
+     * overwriting any existing data. The contents of the file beyond offset are
+     * shifted towards right by len bytes to create a hole.  As such, this
+     * operation will increase the size of the file by len bytes.
+     *
+     * Different filesystems may implement different limitations on the granularity
+     * of the operation. Most will limit operations to filesystem block size
+     * boundaries, but this boundary may be larger or smaller depending on
+     * the filesystem and/or the configuration of the filesystem or file.
+     *
+     * Attempting to insert space using this flag at OR beyond the end of
+     * the file is considered an illegal operation - just use ftruncate(2) or
+     * fallocate(2) with mode 0 for such type of operations.
+     */
+    public static final int FALLOC_FL_INSERT_RANGE = 0x20;
+
+    /**
+     * FALLOC_FL_UNSHARE_RANGE is used to unshare shared blocks within the
+     * file size without overwriting any existing data. The purpose of this
+     * call is to preemptively reallocate any blocks that are subject to
+     * copy-on-write.
+     *
+     * Different filesystems may implement different limitations on the
+     * granularity of the operation. Most will limit operations to filesystem
+     * block size boundaries, but this boundary may be larger or smaller
+     * depending on the filesystem and/or the configuration of the filesystem
+     * or file.
+     *
+     * This flag can only be used with allocate-mode fallocate, which is
+     * to say that it cannot be used with the punch, zero, collapse, or
+     * insert range modes.
+     */
+    public static final int FALLOC_FL_UNSHARE_RANGE = 0x40;
+
+    /** */
+    private static final LinuxNativeLibC libc = LibraryLoader.create(LinuxNativeLibC.class)
+        .failImmediately().load("c");
+
+    /** {@inheritDoc} */
+    @Override public void punchHole(int fd, long off, long len) {
+        int res = libc.fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, len);
+
+        if (res != 0)
+            throw new IgniteException("errno: " + res);
+    }
+
+    /**
+     */
+    public interface LinuxNativeLibC {
+        /**
+         * Allows the caller to directly manipulate the allocated
+         * disk space for the file referred to by fd for the byte range starting
+         * at {@code off} offset and continuing for {@code len} bytes.
+         *
+         * @param fd   file descriptor.
+         * @param mode determines the operation to be performed on the given range.
+         * @param off  required position offset.
+         * @param len  required length.
+         * @return On success, fallocate() returns zero.  On error, -1 is returned and
+         * {@code errno} is set to indicate the error.
+         */
+        int fallocate(int fd, int mode, long off, long len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java
new file mode 100644
index 0000000..fcf485f
--- /dev/null
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.nio.file.Path;
+import jnr.posix.FileStat;
+import jnr.posix.POSIX;
+import jnr.posix.POSIXFactory;
+
+/**
+ * Posix file system API.
+ */
+public class NativeFileSystemPosix implements NativeFileSystem {
+    /** */
+    private static POSIX posix = POSIXFactory.getPOSIX();
+
+    /** {@inheritDoc} */
+    @Override public int getFileSystemBlockSize(Path path) {
+        FileStat stat = posix.stat(path.toString());
+        return Math.toIntExact(stat.blockSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getFileSystemBlockSize(int fd) {
+        FileStat stat = posix.fstat(fd);
+        return Math.toIntExact(stat.blockSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSparseFileSize(int fd) {
+        FileStat stat = posix.fstat(fd);
+        return stat.blocks() * 512;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void punchHole(int fd, long off, long len) {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
new file mode 100644
index 0000000..f660426
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
@@ -0,0 +1,1021 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.SimpleDataPageIO;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
+import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.UNCOMPRESSED_PAGE;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessorImpl.allocateDirectBuffer;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessorTest.TestInnerIO.INNER_IO;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessorTest.TestLeafIO.LEAF_IO;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+
+/**
+ */
+public class CompressionProcessorTest extends GridCommonAbstractTest {
+    /** */
+    private static final int ITEM_SIZE = 6; // To fill the whole page.
+
+    /** */
+    private int blockSize = 16;
+
+    /** */
+    private int pageSize = 4 * 1024;
+
+    /** */
+    private DiskPageCompression compression;
+
+    /** */
+    private int compressLevel;
+
+    /** */
+    private CompressionProcessor p;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        PageIO.registerTest(INNER_IO, LEAF_IO);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() {
+        p = new CompressionProcessorImpl(new GridTestKernalContext(log));
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageCompact16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = SKIP_GARBAGE;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageCompact128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = SKIP_GARBAGE;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageCompact1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = SKIP_GARBAGE;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageCompact2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = SKIP_GARBAGE;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageZstd16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageZstd128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageZstd1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageZstd2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageSnappy16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = SNAPPY;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageSnappy128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = SNAPPY;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageSnappy1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = SNAPPY;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageSnappy2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = SNAPPY;
+
+        doTestDataPage();
+    }
+
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Fast16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Fast128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Fast1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Fast2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Slow16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Slow128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Slow1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testDataPageLz4Slow2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestDataPage();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageCompact16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageCompact16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageZstd16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageZstd16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Fast16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Fast16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Slow16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Slow16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageSnappy16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = SNAPPY;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageSnappy16() throws IgniteCheckedException {
+        blockSize = 16;
+        compression = SNAPPY;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageCompact128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageCompact128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageZstd128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageZstd128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Fast128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Fast128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Slow128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Slow128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageSnappy128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = SNAPPY;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageSnappy128() throws IgniteCheckedException {
+        blockSize = 128;
+        compression = SNAPPY;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageCompact1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageCompact1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageZstd1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageZstd1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Fast1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Fast1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Slow1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Slow1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageSnappy1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = SNAPPY;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageSnappy1k() throws IgniteCheckedException {
+        blockSize = 1024;
+        compression = SNAPPY;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageCompact2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageCompact2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = SKIP_GARBAGE;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageZstd2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageZstd2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = ZSTD;
+        compressLevel = ZSTD_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Fast2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Fast2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MIN_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageLz4Slow2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageLz4Slow2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = LZ4;
+        compressLevel = LZ4_MAX_LEVEL;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInnerPageSnappy2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = SNAPPY;
+
+        doTestBTreePage(INNER_IO);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testLeafPageSnappy2k() throws IgniteCheckedException {
+        blockSize = 2 * 1024;
+        compression = SNAPPY;
+
+        doTestBTreePage(LEAF_IO);
+    }
+
+    /**
+     * @param io Page IO.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doTestBTreePage(BPlusIO<byte[]> io) throws IgniteCheckedException {
+        Random rnd = ThreadLocalRandom.current();
+
+        final byte[][] rows = new byte[3][io.getItemSize()];
+
+        for (int i = 0; i < rows.length; i++)
+            rnd.nextBytes(rows[i]);
+
+        ByteBuffer page = allocateDirectBuffer(pageSize);
+        long pageAddr = bufferAddress(page);
+
+        long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, 171717);
+
+        io.initNewPage(pageAddr, pageId, pageSize);
+
+        checkIo(io, page);
+
+        Function<ByteBuffer, List<?>> getContents = (buf) -> {
+            long addr = bufferAddress(buf);
+
+            int cnt = io.getCount(addr);
+
+            List<Object> list = new ArrayList<>(cnt);
+
+            for (int i = 0; i < cnt; i++) {
+                if (!io.isLeaf())
+                    list.add(((BPlusInnerIO)io).getLeft(addr, i));
+
+                try {
+                    list.add(new Bytes(io.getLookupRow(null, addr, i)));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IllegalStateException(e);
+                }
+
+                if (!io.isLeaf())
+                    list.add(((BPlusInnerIO)io).getRight(addr, i));
+            }
+
+            return list;
+        };
+
+        // Empty page.
+        checkCompressDecompress(page, getContents, false);
+
+        int cnt = io.getMaxCount(pageAddr, pageSize);
+
+        for (int i = 0; i < cnt; i++) {
+            byte[] row = rows[rnd.nextInt(rows.length)];
+            io.insert(pageAddr, i, row, row, 777_000 + i, false);
+        }
+
+        if (io.isLeaf())
+            assertEquals(pageSize, io.getItemsEnd(pageAddr)); // Page must be full.
+
+        // Full page.
+        checkCompressDecompress(page, getContents, io.isLeaf());
+
+        io.setCount(pageAddr, cnt / 2);
+
+        // Half page.
+        checkCompressDecompress(page, getContents, false);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doTestDataPage() throws IgniteCheckedException {
+        Random rnd = ThreadLocalRandom.current();
+
+        final byte[][] rows = new byte[][]{
+            new byte[17], new byte[37], new byte[71]
+        };
+
+        for (int i = 0; i < rows.length; i++)
+            rnd.nextBytes(rows[i]);
+
+        ByteBuffer page = allocateDirectBuffer(pageSize);
+        long pageAddr = bufferAddress(page);
+
+        SimpleDataPageIO io = SimpleDataPageIO.VERSIONS.latest();
+
+        long pageId = PageIdUtils.pageId(PageIdAllocator.MAX_PARTITION_ID, PageIdAllocator.FLAG_DATA, 171717);
+
+        io.initNewPage(pageAddr, pageId, pageSize);
+
+        checkIo(io, page);
+
+        Function<ByteBuffer,List<Bytes>> getContents = (buf) -> {
+            try {
+                long addr = bufferAddress(buf);
+
+                return io.forAllItems(addr, (link) -> {
+                    DataPagePayload payload = io.readPayload(addr, PageIdUtils.itemId(link), pageSize);
+
+                    return new Bytes(payload.getBytes(addr));
+                });
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        };
+
+        // Empty data page.
+        checkCompressDecompress(page, getContents, false);
+
+        GridIntList itemIds = new GridIntList();
+
+        for (;;) {
+            byte[] row = rows[rnd.nextInt(rows.length)];
+
+            if (io.getFreeSpace(pageAddr) < row.length)
+                break;
+
+            itemIds.add(io.addRow(pageAddr, row, pageSize));
+        }
+
+        int freeSpace = io.getFreeSpace(pageAddr);
+
+        if (freeSpace != 0) {
+            byte[] lastRow = new byte[freeSpace];
+            rnd.nextBytes(lastRow);
+
+            io.addRowFragment(pageId, pageAddr, lastRow, 777L, pageSize);
+
+            assertEquals(0, io.getRealFreeSpace(pageAddr));
+        }
+
+        // Full data page.
+        checkCompressDecompress(page, getContents, io.getRealFreeSpace(pageAddr) == 0);
+
+        for (int i = 0; i < itemIds.size(); i += 2)
+            io.removeRow(pageAddr, itemIds.get(i), pageSize);
+
+        // Half-filled data page.
+        checkCompressDecompress(page, getContents, false);
+    }
+
+    private void checkIo(PageIO io, ByteBuffer page) throws IgniteCheckedException {
+        assertSame(io, PageIO.getPageIO(bufferAddress(page)));
+        assertSame(io, PageIO.getPageIO(page));
+    }
+
+    private void checkCompressDecompress(ByteBuffer page, Function<ByteBuffer, ?> getPageContents, boolean fullPage)
+        throws IgniteCheckedException {
+        PageIO.setCrc(page, 0xABCDEF13);
+
+        long pageId = PageIO.getPageId(page);
+        PageIO io = PageIO.getPageIO(page);
+
+        ByteBuffer compressed = p.compressPage(page, pageSize, blockSize, compression, compressLevel);
+
+        int compressedSize = PageIO.getCompressedSize(compressed);
+
+        assertNotSame(page, compressed); // This is generally possible but not interesting in this test.
+
+        assertTrue(compressedSize > 0);
+        assertTrue(compressedSize <= pageSize);
+        assertEquals(compressedSize, compressed.limit());
+
+        if (!fullPage || compression != SKIP_GARBAGE)
+            assertTrue(pageSize > compressedSize);
+
+        assertEquals(0, compressed.position());
+
+        checkIo(io, compressed);
+        assertEquals(0, page.position());
+        assertEquals(pageSize, page.limit());
+
+        info(io.getClass().getSimpleName() + " " + compression + " " + compressLevel + ": " + compressedSize + "/" + pageSize);
+
+        if (!fullPage || compression != SKIP_GARBAGE)
+            assertTrue(compressedSize < pageSize);
+
+        assertEquals(pageId, PageIO.getPageId(compressed));
+
+        ByteBuffer decompress = allocateDirectBuffer(pageSize);
+        decompress.put(compressed).clear();
+
+        p.decompressPage(decompress, pageSize);
+
+        assertEquals(0, decompress.position());
+        assertEquals(pageSize, decompress.limit());
+
+        checkIo(io, decompress);
+        assertEquals(UNCOMPRESSED_PAGE, PageIO.getCompressionType(page));
+        assertEquals(0, PageIO.getCompressedSize(page));
+        assertEquals(0, PageIO.getCompactedSize(page));
+
+        assertTrue(Arrays.equals(getPageCommonHeader(page), getPageCommonHeader(decompress)));
+        assertEquals(getPageContents.apply(page), getPageContents.apply(decompress));
+    }
+
+    /**
+     * @param page Page.
+     * @return Page header.
+     */
+    private static byte[] getPageCommonHeader(ByteBuffer page) {
+        return PageUtils.getBytes(GridUnsafe.bufferAddress(page), 0, PageIO.COMMON_HEADER_END);
+    }
+
+    /**
+     */
+    private static class Bytes {
+        /** */
+        private final byte[] bytes;
+
+        /**
+         * @param bytes Bytes.
+         */
+        private Bytes(byte[] bytes) {
+            assert bytes != null;
+            this.bytes = bytes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Bytes bytes1 = (Bytes)o;
+
+            return Arrays.equals(bytes, bytes1.bytes);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Arrays.hashCode(bytes);
+        }
+    }
+
+    /**
+     */
+    static class TestLeafIO extends BPlusLeafIO<byte[]> {
+        /** */
+        static final TestLeafIO LEAF_IO = new TestLeafIO();
+
+        /**
+         */
+        TestLeafIO() {
+            super(29_501, 1, ITEM_SIZE);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void storeByOffset(long pageAddr, int off, byte[] row) {
+            PageUtils.putBytes(pageAddr, off, row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<byte[]> srcIo, long srcPageAddr,
+            int srcIdx) throws IgniteCheckedException {
+            storeByOffset(dstPageAddr, offset(dstIdx), srcIo.getLookupRow(null, srcPageAddr, srcIdx));
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte[] getLookupRow(BPlusTree<byte[],?> tree, long pageAddr, int idx) {
+            return PageUtils.getBytes(pageAddr, offset(idx), itemSize);
+        }
+    }
+
+    /**
+     */
+    static class TestInnerIO extends BPlusInnerIO<byte[]> {
+        /** */
+        static TestInnerIO INNER_IO = new TestInnerIO();
+
+        /**
+         */
+        TestInnerIO() {
+            super(29_502, 1, true, ITEM_SIZE);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void storeByOffset(long pageAddr, int off, byte[] row) {
+            PageUtils.putBytes(pageAddr, off, row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<byte[]> srcIo, long srcPageAddr,
+            int srcIdx) throws IgniteCheckedException {
+            storeByOffset(dstPageAddr, offset(dstIdx), srcIo.getLookupRow(null, srcPageAddr, srcIdx));
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte[] getLookupRow(BPlusTree<byte[],?> tree, long pageAddr, int idx) {
+            return PageUtils.getBytes(pageAddr, offset(idx), itemSize);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java
new file mode 100644
index 0000000..658a5d2
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+
+/**
+ */
+public class DiskPageCompressionIntegrationAsyncTest extends DiskPageCompressionIntegrationTest {
+    /** {@inheritDoc} */
+    @Override protected FileIOFactory getFileIOFactory() {
+        return new AsyncFileIOFactory();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
new file mode 100644
index 0000000..ca7f4ea
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
+import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
+import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL;
+
+/**
+ *
+ */
+public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
+    /** */
+    private DiskPageCompression compression;
+
+    /** */
+    private Integer compressionLevel;
+
+    /** */
+    private FileIOFactory factory;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        compression = null;
+        compressionLevel = null;
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+        DataRegionConfiguration drCfg = new DataRegionConfiguration()
+            .setPersistenceEnabled(true);
+
+        factory = getFileIOFactory();
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+            .setMetricsEnabled(true)
+            .setPageSize(MAX_PAGE_SIZE)
+            .setDefaultDataRegionConfiguration(drCfg)
+            .setFileIOFactory(U.isLinux() ? factory : new PunchFileIOFactory(factory));
+
+        return super.getConfiguration(igniteName).setDataStorageConfiguration(dsCfg);
+    }
+
+    /**
+     * @return File IO factory.
+     */
+    protected FileIOFactory getFileIOFactory() {
+        return new RandomAccessFileIOFactory();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_Zstd_Max() throws Exception {
+        compression = ZSTD;
+        compressionLevel = ZSTD_MAX_LEVEL;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_Zstd_Default() throws Exception {
+        compression = ZSTD;
+        compressionLevel = null;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_Zstd_Min() throws Exception {
+        compression = ZSTD;
+        compressionLevel = ZSTD_MIN_LEVEL;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_Lz4_Max() throws Exception {
+        compression = LZ4;
+        compressionLevel = LZ4_MAX_LEVEL;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_Lz4_Default() throws Exception {
+        compression = LZ4;
+        compressionLevel = null;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_Lz4_Min() throws Exception {
+        assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_SkipGarbage() throws Exception {
+        compression = SKIP_GARBAGE;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPageCompression_Snappy() throws Exception {
+        compression = SNAPPY;
+
+        doTestPageCompression();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestPageCompression() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        String cacheName = "test";
+
+        CacheConfiguration<Integer,TestVal> ccfg = new CacheConfiguration<Integer,TestVal>()
+            .setName(cacheName)
+            .setBackups(0)
+            .setAtomicityMode(ATOMIC)
+            .setIndexedTypes(Integer.class, TestVal.class)
+            .setDiskPageCompression(compression)
+            .setDiskPageCompressionLevel(compressionLevel);
+
+        IgniteCache<Integer,TestVal> cache = ignite.getOrCreateCache(ccfg);
+
+        int cnt = 2_000;
+
+        for (int i = 0; i < cnt; i++)
+            assertTrue(cache.putIfAbsent(i, new TestVal(i)));
+
+        for (int i = 0; i < cnt; i += 2)
+            assertEquals(new TestVal(i), cache.getAndRemove(i));
+
+        GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context()
+            .cache().context().database());
+
+        dbMgr.forceCheckpoint("test compression").finishFuture().get();
+
+        FilePageStoreManager storeMgr = dbMgr.getFileStoreManager();
+
+        checkFileIOFactory(storeMgr.getPageStoreFileIoFactory());
+
+        Thread.sleep(100); // Wait for metrics update.
+
+        long storeSize = ignite.dataStorageMetrics().getStorageSize();
+        long sparseStoreSize = ignite.dataStorageMetrics().getSparseStorageSize();
+
+        assertTrue("storeSize: " + storeSize, storeSize > 0);
+
+        if (U.isLinux()) {
+            assertTrue("sparseSize: " + sparseStoreSize, sparseStoreSize > 0);
+            assertTrue(storeSize + " > " + sparseStoreSize, storeSize > sparseStoreSize);
+        }
+        else
+            assertTrue(sparseStoreSize < 0);
+
+        GridCacheContext<?,?> cctx = ignite.cachex(cacheName).context();
+
+        int cacheId = cctx.cacheId();
+        int groupId = cctx.groupId();
+
+        assertEquals(cacheId, groupId);
+
+        CacheGroupMetricsMXBean mx = cctx.group().mxBean();
+
+        storeSize = mx.getStorageSize();
+        sparseStoreSize = mx.getSparseStorageSize();
+
+        assertTrue("storeSize: " + storeSize, storeSize > 0);
+
+        if (U.isLinux()) {
+            assertTrue("sparseSize: " + sparseStoreSize, sparseStoreSize > 0);
+            assertTrue(storeSize + " > " + sparseStoreSize, storeSize > sparseStoreSize);
+        }
+        else
+            assertTrue(sparseStoreSize < 0);
+
+        int parts = cctx.affinity().partitions();
+
+        for (int i = 0; i < parts; i++) {
+            PageStore store = storeMgr.getStore(cacheId, i);
+
+            long realSize = store.size();
+            long virtualSize = store.getPageSize() * store.pages();
+            long sparseSize = store.getSparseSize();
+
+            assertTrue(virtualSize > 0);
+
+            error("virt: " + virtualSize + ",  real: " + realSize + ",  sparse: " + sparseSize);
+
+            if (!store.exists())
+                continue;
+
+            if (virtualSize > sparseSize)
+                return;
+        }
+
+        fail("No files were compacted.");
+    }
+
+    /**
+     */
+    public void _testCompressionRatio() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        String cacheName = "test";
+
+        CacheConfiguration<Integer,TestVal> ccfg = new CacheConfiguration<Integer,TestVal>()
+            .setName(cacheName)
+            .setBackups(0)
+            .setAtomicityMode(ATOMIC)
+            .setIndexedTypes(Integer.class, TestVal.class)
+            .setAffinity(new RendezvousAffinityFunction().setPartitions(10))
+            .setDiskPageCompression(ZSTD);
+//            .setDiskPageCompressionLevel(compressionLevel);
+
+        ignite.getOrCreateCache(ccfg);
+
+        IgniteInternalCache<Integer,TestVal> cache = ignite.cachex(cacheName);
+
+        CacheGroupMetricsMXBean mx = cache.context().group().mxBean();
+
+        GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context()
+            .cache().context().database());
+
+        int cnt = 20_000_000;
+
+        for (int i = 0; i < cnt; i++) {
+            assertTrue(cache.putIfAbsent(i, new TestVal(i)));
+
+            if (i % 50_000 == 0) {
+                dbMgr.forceCheckpoint("test").finishFuture().get();
+
+                long sparse = mx.getSparseStorageSize();
+                long size = mx.getStorageSize();
+
+                System.out.println(i + " >> " + sparse + " / " + size + " = " + ((double)sparse / size));
+            }
+        }
+    }
+
+    /**
+     * @param f Factory.
+     */
+    protected void checkFileIOFactory(FileIOFactory f) {
+        if (!U.isLinux())
+            f = ((PunchFileIOFactory)f).delegate;
+
+        assertSame(factory, f);
+    }
+
+    /**
+     */
+    static class TestVal implements Serializable {
+        /** */
+        static final long serialVersionUID = 1L;
+
+        /** */
+        @QuerySqlField
+        String str;
+
+        /** */
+        int i;
+
+        /** */
+        @QuerySqlField
+        long x;
+
+        /** */
+        @QuerySqlField
+        UUID id;
+
+        TestVal(int i) {
+            this.str =  i + "bla bla bla!";
+            this.i = -i;
+            this.x = 0xffaabbccdd773311L + i;
+            this.id = new UUID(i,-i);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            TestVal testVal = (TestVal)o;
+
+            if (i != testVal.i) return false;
+            if (x != testVal.x) return false;
+            if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false;
+            return id != null ? id.equals(testVal.id) : testVal.id == null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = str != null ? str.hashCode() : 0;
+            result = 31 * result + i;
+            result = 31 * result + (int)(x ^ (x >>> 32));
+            result = 31 * result + (id != null ? id.hashCode() : 0);
+            return result;
+        }
+    }
+
+    /**
+     */
+    static class PunchFileIO extends FileIODecorator {
+        /** */
+        private ConcurrentMap<Long, Integer> holes = new ConcurrentHashMap<>();
+
+        /**
+         * @param delegate File I/O delegate
+         */
+        public PunchFileIO(FileIO delegate) {
+            super(Objects.requireNonNull(delegate));
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getFileSystemBlockSize() {
+            assertFalse(U.isLinux());
+
+            return 4 * 1024;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getSparseSize() {
+            assertFalse(U.isLinux());
+
+            long holesSize = holes.values().stream().mapToLong(x -> x).sum();
+
+            try {
+                return size() - holesSize;
+            }
+            catch (IOException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int writeFully(ByteBuffer srcBuf, long position) throws IOException {
+            assertFalse(U.isLinux());
+
+            holes.remove(position);
+            return super.writeFully(srcBuf, position);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int punchHole(long pos, int len) {
+            assertFalse(U.isLinux());
+
+            assertTrue(len > 0);
+
+            int blockSize = getFileSystemBlockSize();
+
+            len = len / blockSize * blockSize;
+
+            if (len > 0)
+                holes.put(pos, len);
+
+            return len;
+        }
+    }
+
+    /**
+     */
+    static class PunchFileIOFactory implements FileIOFactory {
+        /** */
+        final FileIOFactory delegate;
+
+        /**
+         * @param delegate Delegate.
+         */
+        PunchFileIOFactory(FileIOFactory delegate) {
+            this.delegate = Objects.requireNonNull(delegate);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return new PunchFileIO(delegate.create(file));
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            return new PunchFileIO(delegate.create(file, modes));
+        }
+    }
+}