You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/28 11:52:48 UTC
[17/50] [abbrv] ignite git commit: IGNITE-10330: Disk page
compression. - Fixes #5200.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/licenses/lz4-LICENSE.txt
----------------------------------------------------------------------
diff --git a/modules/compress/licenses/lz4-LICENSE.txt b/modules/compress/licenses/lz4-LICENSE.txt
new file mode 100644
index 0000000..c221aeb
--- /dev/null
+++ b/modules/compress/licenses/lz4-LICENSE.txt
@@ -0,0 +1,11 @@
+This repository uses 2 different licenses :
+- all files in the `lib` directory use a BSD 2-Clause license
+- all other files use a GPLv2 license, unless explicitly stated otherwise
+
+Relevant license is reminded at the top of each source file,
+and with presence of COPYING or LICENSE file in associated directories.
+
+This model is selected to emphasize that
+files in the `lib` directory are designed to be included into 3rd party applications,
+while all other files, in `programs`, `tests` or `examples`,
+receive more limited attention and support for such scenario.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/licenses/lz4-java-LICENSE.txt
----------------------------------------------------------------------
diff --git a/modules/compress/licenses/lz4-java-LICENSE.txt b/modules/compress/licenses/lz4-java-LICENSE.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/compress/licenses/lz4-java-LICENSE.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/pom.xml
----------------------------------------------------------------------
diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml
new file mode 100644
index 0000000..876a121
--- /dev/null
+++ b/modules/compress/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-compress</artifactId>
+ <version>2.7.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.jnr</groupId>
+ <artifactId>jnr-posix</artifactId>
+ <version>${jnr.posix.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>${zstd.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>${lz4.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${spring.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thoughtworks.xstream</groupId>
+ <artifactId>xstream</artifactId>
+ <version>1.4.8</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
new file mode 100644
index 0000000..2553371
--- /dev/null
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import com.github.luben.zstd.Zstd;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.xerial.snappy.Snappy;
+
+import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
+
+/**
+ * Compression processor.
+ */
+public class CompressionProcessorImpl extends CompressionProcessor {
+ /** Max page size. */
+ private final ThreadLocalByteBuffer compactBuf = new ThreadLocalByteBuffer(MAX_PAGE_SIZE);
+
+ /** A bit more than max page size. */
+ private final ThreadLocalByteBuffer compressBuf = new ThreadLocalByteBuffer(MAX_PAGE_SIZE + 1024);
+
+ /**
+ * @param ctx Kernal context.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public CompressionProcessorImpl(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * @param cap Capacity.
+ * @return Direct byte buffer.
+ */
+ static ByteBuffer allocateDirectBuffer(int cap) {
+ return ByteBuffer.allocateDirect(cap).order(NATIVE_BYTE_ORDER);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException {
+ if (!U.isLinux())
+ throw new IgniteCheckedException("Currently page compression is supported only for Linux.");
+
+ FileSystemUtils.checkSupported();
+
+ int fsBlockSize = FileSystemUtils.getFileSystemBlockSize(storagePath);
+
+ if (fsBlockSize <= 0)
+ throw new IgniteCheckedException("Failed to get file system block size: " + storagePath);
+
+ if (!U.isPow2(fsBlockSize))
+ throw new IgniteCheckedException("Storage block size must be power of 2: " + fsBlockSize);
+
+ if (pageSize < fsBlockSize * 2) {
+ throw new IgniteCheckedException("Page size (now configured to " + pageSize + " bytes) " +
+ "must be at least 2 times larger than the underlying storage block size (detected to be " + fsBlockSize +
+ " bytes at '" + storagePath + "') for page compression.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer compressPage(
+ ByteBuffer page,
+ int pageSize,
+ int blockSize,
+ DiskPageCompression compression,
+ int compressLevel
+ ) throws IgniteCheckedException {
+ assert compression != null;
+ assert U.isPow2(pageSize): pageSize;
+ assert U.isPow2(blockSize): blockSize;
+ assert page.position() == 0 && page.limit() == pageSize;
+
+ PageIO io = PageIO.getPageIO(page);
+
+ if (!(io instanceof CompactablePageIO))
+ return page;
+
+ ByteBuffer compactPage = compactBuf.get();
+
+ // Drop the garbage from the page.
+ ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
+ page.clear();
+
+ int compactSize = compactPage.limit();
+
+ assert compactSize <= pageSize: compactSize;
+
+ // If no need to compress further or configured just to skip garbage.
+ if (compactSize < blockSize || compression == SKIP_GARBAGE)
+ return setCompactionInfo(compactPage, compactSize);
+
+ ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);
+
+ assert compressedPage.position() == 0;
+ int compressedSize = compressedPage.limit();
+
+ int freeCompactBlocks = (pageSize - compactSize) / blockSize;
+ int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+
+ if (freeCompactBlocks >= freeCompressedBlocks) {
+ if (freeCompactBlocks == 0)
+ return page; // No blocks will be released.
+
+ return setCompactionInfo(compactPage, compactSize);
+ }
+
+ return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
+ }
+
+ /**
+ * @param page Page.
+ * @param compactSize Compacted page size.
+ * @return The given page.
+ */
+ private static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) {
+ return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize);
+ }
+
+ /**
+ * @param page Page.
+ * @param compression Compression algorithm.
+ * @param compressedSize Compressed size.
+ * @param compactedSize Compact size.
+ * @return The given page.
+ */
+ private static ByteBuffer setCompressionInfo(ByteBuffer page, DiskPageCompression compression, int compressedSize, int compactedSize) {
+ assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE: compressedSize;
+ assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE: compactedSize;
+
+ PageIO.setCompressionType(page, getCompressionType(compression));
+ PageIO.setCompressedSize(page, (short)compressedSize);
+ PageIO.setCompactedSize(page, (short)compactedSize);
+
+ return page;
+ }
+
+ /**
+ * @param compression Compression algorithm.
+ * @param compactPage Compacted page.
+ * @param compactSize Compacted page size.
+ * @param compressLevel Compression level.
+ * @return Compressed page.
+ */
+ private ByteBuffer doCompressPage(DiskPageCompression compression, ByteBuffer compactPage, int compactSize, int compressLevel) {
+ switch (compression) {
+ case ZSTD:
+ return compressPageZstd(compactPage, compactSize, compressLevel);
+
+ case LZ4:
+ return compressPageLz4(compactPage, compactSize, compressLevel);
+
+ case SNAPPY:
+ return compressPageSnappy(compactPage, compactSize);
+ }
+ throw new IllegalStateException("Unsupported compression: " + compression);
+ }
+
+ /**
+ * @param compactPage Compacted page.
+ * @param compactSize Compacted page size.
+ * @param compressLevel Compression level.
+ * @return Compressed page.
+ */
+ private ByteBuffer compressPageLz4(ByteBuffer compactPage, int compactSize, int compressLevel) {
+ LZ4Compressor compressor = Lz4.getCompressor(compressLevel);
+
+ ByteBuffer compressedPage = compressBuf.get();
+
+ copyPageHeader(compactPage, compressedPage, compactSize);
+ compressor.compress(compactPage, compressedPage);
+
+ compactPage.flip();
+ compressedPage.flip();
+
+ return compressedPage;
+ }
+
+ /**
+ * @param compactPage Compacted page.
+ * @param compactSize Compacted page size.
+ * @param compressLevel Compression level.
+ * @return Compressed page.
+ */
+ private ByteBuffer compressPageZstd(ByteBuffer compactPage, int compactSize, int compressLevel) {
+ ByteBuffer compressedPage = compressBuf.get();
+
+ copyPageHeader(compactPage, compressedPage, compactSize);
+ Zstd.compress(compressedPage, compactPage, compressLevel);
+
+ compactPage.flip();
+ compressedPage.flip();
+
+ return compressedPage;
+ }
+
+ /**
+ * @param compactPage Compacted page.
+ * @param compactSize Compacted page size.
+ * @return Compressed page.
+ */
+ private ByteBuffer compressPageSnappy(ByteBuffer compactPage, int compactSize) {
+ ByteBuffer compressedPage = compressBuf.get();
+
+ copyPageHeader(compactPage, compressedPage, compactSize);
+
+ try {
+ int compressedSize = Snappy.compress(compactPage, compressedPage);
+ assert compressedPage.limit() == PageIO.COMMON_HEADER_END + compressedSize;
+ }
+ catch (IOException e) {
+ throw new IgniteException("Failed to compress page with Snappy.", e);
+ }
+
+ compactPage.position(0);
+ compressedPage.position(0);
+
+ return compressedPage;
+ }
+
+ /**
+ * @param compactPage Compacted page.
+ * @param compressedPage Compressed page.
+ * @param compactSize Compacted page size.
+ */
+ private static void copyPageHeader(ByteBuffer compactPage, ByteBuffer compressedPage, int compactSize) {
+ compactPage.limit(PageIO.COMMON_HEADER_END);
+ compressedPage.put(compactPage);
+ compactPage.limit(compactSize);
+ }
+
+ /**
+ * @param compression Compression.
+ * @return Level.
+ */
+ private static byte getCompressionType(DiskPageCompression compression) {
+ if (compression == null)
+ return UNCOMPRESSED_PAGE;
+
+ switch (compression) {
+ case ZSTD:
+ return ZSTD_COMPRESSED_PAGE;
+
+ case LZ4:
+ return LZ4_COMPRESSED_PAGE;
+
+ case SNAPPY:
+ return SNAPPY_COMPRESSED_PAGE;
+
+ case SKIP_GARBAGE:
+ return COMPACTED_PAGE;
+ }
+ throw new IllegalStateException("Unexpected compression: " + compression);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
+ assert page.capacity() == pageSize;
+
+ byte compressType = PageIO.getCompressionType(page);
+
+ if (compressType == UNCOMPRESSED_PAGE)
+ return; // Nothing to do.
+
+ short compressedSize = PageIO.getCompressedSize(page);
+ short compactSize = PageIO.getCompactedSize(page);
+
+ assert compactSize <= pageSize && compactSize >= compressedSize;
+
+ if (compressType == COMPACTED_PAGE) {
+ // Just setup bounds before restoring the page.
+ page.position(0).limit(compactSize);
+ }
+ else {
+ ByteBuffer dst = compressBuf.get();
+
+ // Position on a part that needs to be decompressed.
+ page.limit(compressedSize)
+ .position(PageIO.COMMON_HEADER_END);
+
+ // LZ4 needs this limit to be exact.
+ dst.limit(compactSize - PageIO.COMMON_HEADER_END);
+
+ switch (compressType) {
+ case ZSTD_COMPRESSED_PAGE:
+ Zstd.decompress(dst, page);
+ dst.flip();
+
+ break;
+
+ case LZ4_COMPRESSED_PAGE:
+ Lz4.decompress(page, dst);
+ dst.flip();
+
+ break;
+
+ case SNAPPY_COMPRESSED_PAGE:
+ try {
+ Snappy.uncompress(page, dst);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ break;
+
+ default:
+ throw new IgniteException("Unknown compression: " + compressType);
+ }
+
+ page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
+ page.put(dst).flip();
+ assert page.limit() == compactSize;
+ }
+
+ CompactablePageIO io = PageIO.getPageIO(page);
+
+ io.restorePage(page, pageSize);
+
+ setCompressionInfo(page, null, 0, 0);
+ }
+
+ /** */
+ static class Lz4 {
+ /** */
+ static final LZ4Factory factory = LZ4Factory.fastestInstance();
+
+ /** */
+ static final LZ4FastDecompressor decompressor = factory.fastDecompressor();
+
+ /** */
+ static final LZ4Compressor fastCompressor = factory.fastCompressor();
+
+ /**
+ * @param level Compression level.
+ * @return Compressor.
+ */
+ static LZ4Compressor getCompressor(int level) {
+ assert level >= 0 && level <= 17: level;
+ return level == 0 ? fastCompressor : factory.highCompressor(level);
+ }
+
+ /**
+ * @param page Page.
+ * @param dst Destination buffer.
+ */
+ static void decompress(ByteBuffer page, ByteBuffer dst) {
+ decompressor.decompress(page, dst);
+ }
+ }
+
+ /**
+ */
+ static final class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
+ /** */
+ final int size;
+
+ /**
+ * @param size Size.
+ */
+ ThreadLocalByteBuffer(int size) {
+ this.size = size;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ByteBuffer initialValue() {
+ return allocateDirectBuffer(size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer get() {
+ ByteBuffer buf = super.get();
+ buf.clear();
+ return buf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java
new file mode 100644
index 0000000..22d4926
--- /dev/null
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import jnr.ffi.LibraryLoader;
+import org.apache.ignite.IgniteException;
+
+/**
+ * Linux native file system API.
+ */
+public final class NativeFileSystemLinux extends NativeFileSystemPosix {
+ /**
+ * default is extend size
+ */
+ public static final int FALLOC_FL_KEEP_SIZE = 0x01;
+
+ /**
+ * de-allocates range
+ */
+ public static final int FALLOC_FL_PUNCH_HOLE = 0x02;
+
+ /**
+ * reserved codepoint
+ */
+ public static final int FALLOC_FL_NO_HIDE_STALE = 0x04;
+
+ /**
+ * FALLOC_FL_COLLAPSE_RANGE is used to remove a range of a file
+ * without leaving a hole in the file. The contents of the file beyond
+ * the range being removed is appended to the start offset of the range
+ * being removed (i.e. the hole that was punched is "collapsed"),
+ * resulting in a file layout that looks like the range that was
+ * removed never existed. As such collapsing a range of a file changes
+ * the size of the file, reducing it by the same length of the range
+ * that has been removed by the operation.
+ *
+ * Different filesystems may implement different limitations on the
+ * granularity of the operation. Most will limit operations to
+ * filesystem block size boundaries, but this boundary may be larger or
+ * smaller depending on the filesystem and/or the configuration of the
+ * filesystem or file.
+ *
+ * Attempting to collapse a range that crosses the end of the file is
+ * considered an illegal operation - just use ftruncate(2) if you need
+ * to collapse a range that crosses EOF.
+ */
+ public static final int FALLOC_FL_COLLAPSE_RANGE = 0x08;
+
+ /**
+ * FALLOC_FL_ZERO_RANGE is used to convert a range of file to zeros preferably
+ * without issuing data IO. Blocks should be preallocated for the regions that
+ * span holes in the file, and the entire range is preferable converted to
+ * unwritten extents - even though file system may choose to zero out the
+ * extent or do whatever which will result in reading zeros from the range
+ * while the range remains allocated for the file.
+ *
+ * This can be also used to preallocate blocks past EOF in the same way as
+ * with fallocate. Flag FALLOC_FL_KEEP_SIZE should cause the inode
+ * size to remain the same.
+ */
+ public static final int FALLOC_FL_ZERO_RANGE = 0x10;
+
+ /**
+ * FALLOC_FL_INSERT_RANGE is use to insert space within the file size without
+ * overwriting any existing data. The contents of the file beyond offset are
+ * shifted towards right by len bytes to create a hole. As such, this
+ * operation will increase the size of the file by len bytes.
+ *
+ * Different filesystems may implement different limitations on the granularity
+ * of the operation. Most will limit operations to filesystem block size
+ * boundaries, but this boundary may be larger or smaller depending on
+ * the filesystem and/or the configuration of the filesystem or file.
+ *
+ * Attempting to insert space using this flag at OR beyond the end of
+ * the file is considered an illegal operation - just use ftruncate(2) or
+ * fallocate(2) with mode 0 for such type of operations.
+ */
+ public static final int FALLOC_FL_INSERT_RANGE = 0x20;
+
+ /**
+ * FALLOC_FL_UNSHARE_RANGE is used to unshare shared blocks within the
+ * file size without overwriting any existing data. The purpose of this
+ * call is to preemptively reallocate any blocks that are subject to
+ * copy-on-write.
+ *
+ * Different filesystems may implement different limitations on the
+ * granularity of the operation. Most will limit operations to filesystem
+ * block size boundaries, but this boundary may be larger or smaller
+ * depending on the filesystem and/or the configuration of the filesystem
+ * or file.
+ *
+ * This flag can only be used with allocate-mode fallocate, which is
+ * to say that it cannot be used with the punch, zero, collapse, or
+ * insert range modes.
+ */
+ public static final int FALLOC_FL_UNSHARE_RANGE = 0x40;
+
+ /** */
+ private static final LinuxNativeLibC libc = LibraryLoader.create(LinuxNativeLibC.class)
+ .failImmediately().load("c");
+
+ /** {@inheritDoc} */
+ @Override public void punchHole(int fd, long off, long len) {
+ int res = libc.fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, len);
+
+ if (res != 0)
+ throw new IgniteException("errno: " + res);
+ }
+
+ /**
+ */
+ public interface LinuxNativeLibC {
+ /**
+ * Allows the caller to directly manipulate the allocated
+ * disk space for the file referred to by fd for the byte range starting
+ * at {@code off} offset and continuing for {@code len} bytes.
+ *
+ * @param fd file descriptor.
+ * @param mode determines the operation to be performed on the given range.
+ * @param off required position offset.
+ * @param len required length.
+ * @return On success, fallocate() returns zero. On error, -1 is returned and
+ * {@code errno} is set to indicate the error.
+ */
+ int fallocate(int fd, int mode, long off, long len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java
new file mode 100644
index 0000000..fcf485f
--- /dev/null
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.nio.file.Path;
+import jnr.posix.FileStat;
+import jnr.posix.POSIX;
+import jnr.posix.POSIXFactory;
+
+/**
+ * Posix file system API.
+ */
+public class NativeFileSystemPosix implements NativeFileSystem {
+ /** */
+ private static POSIX posix = POSIXFactory.getPOSIX();
+
+ /** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize(Path path) {
+ FileStat stat = posix.stat(path.toString());
+ return Math.toIntExact(stat.blockSize());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize(int fd) {
+ FileStat stat = posix.fstat(fd);
+ return Math.toIntExact(stat.blockSize());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseFileSize(int fd) {
+ FileStat stat = posix.fstat(fd);
+ return stat.blocks() * 512;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void punchHole(int fd, long off, long len) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
new file mode 100644
index 0000000..f660426
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java
@@ -0,0 +1,1021 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.SimpleDataPageIO;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
+import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.UNCOMPRESSED_PAGE;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessorImpl.allocateDirectBuffer;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessorTest.TestInnerIO.INNER_IO;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessorTest.TestLeafIO.LEAF_IO;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+
+/**
+ */
+public class CompressionProcessorTest extends GridCommonAbstractTest {
+ /** */
+ private static final int ITEM_SIZE = 6; // To fill the whole page.
+
+ /** */
+ private int blockSize = 16;
+
+ /** */
+ private int pageSize = 4 * 1024;
+
+ /** */
+ private DiskPageCompression compression;
+
+ /** */
+ private int compressLevel;
+
+ /** */
+ private CompressionProcessor p;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ PageIO.registerTest(INNER_IO, LEAF_IO);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() {
+ p = new CompressionProcessorImpl(new GridTestKernalContext(log));
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageCompact16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = SKIP_GARBAGE;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageCompact128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = SKIP_GARBAGE;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageCompact1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = SKIP_GARBAGE;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageCompact2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = SKIP_GARBAGE;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageZstd16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageZstd128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageZstd1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageZstd2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageSnappy16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = SNAPPY;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageSnappy128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = SNAPPY;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageSnappy1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = SNAPPY;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageSnappy2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = SNAPPY;
+
+ doTestDataPage();
+ }
+
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Fast16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Fast128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Fast1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Fast2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Slow16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Slow128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Slow1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testDataPageLz4Slow2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestDataPage();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageCompact16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageCompact16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageZstd16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageZstd16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Fast16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Fast16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Slow16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Slow16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageSnappy16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = SNAPPY;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageSnappy16() throws IgniteCheckedException {
+ blockSize = 16;
+ compression = SNAPPY;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageCompact128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageCompact128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageZstd128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageZstd128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Fast128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Fast128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Slow128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Slow128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageSnappy128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = SNAPPY;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageSnappy128() throws IgniteCheckedException {
+ blockSize = 128;
+ compression = SNAPPY;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageCompact1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageCompact1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageZstd1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageZstd1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Fast1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Fast1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Slow1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Slow1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageSnappy1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = SNAPPY;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageSnappy1k() throws IgniteCheckedException {
+ blockSize = 1024;
+ compression = SNAPPY;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageCompact2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageCompact2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = SKIP_GARBAGE;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageZstd2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageZstd2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = ZSTD;
+ compressLevel = ZSTD_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Fast2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Fast2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MIN_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageLz4Slow2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageLz4Slow2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = LZ4;
+ compressLevel = LZ4_MAX_LEVEL;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testInnerPageSnappy2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = SNAPPY;
+
+ doTestBTreePage(INNER_IO);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testLeafPageSnappy2k() throws IgniteCheckedException {
+ blockSize = 2 * 1024;
+ compression = SNAPPY;
+
+ doTestBTreePage(LEAF_IO);
+ }
+
+ /**
+ * @param io Page IO.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doTestBTreePage(BPlusIO<byte[]> io) throws IgniteCheckedException {
+ Random rnd = ThreadLocalRandom.current();
+
+ final byte[][] rows = new byte[3][io.getItemSize()];
+
+ for (int i = 0; i < rows.length; i++)
+ rnd.nextBytes(rows[i]);
+
+ ByteBuffer page = allocateDirectBuffer(pageSize);
+ long pageAddr = bufferAddress(page);
+
+ long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, 171717);
+
+ io.initNewPage(pageAddr, pageId, pageSize);
+
+ checkIo(io, page);
+
+ Function<ByteBuffer, List<?>> getContents = (buf) -> {
+ long addr = bufferAddress(buf);
+
+ int cnt = io.getCount(addr);
+
+ List<Object> list = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ if (!io.isLeaf())
+ list.add(((BPlusInnerIO)io).getLeft(addr, i));
+
+ try {
+ list.add(new Bytes(io.getLookupRow(null, addr, i)));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IllegalStateException(e);
+ }
+
+ if (!io.isLeaf())
+ list.add(((BPlusInnerIO)io).getRight(addr, i));
+ }
+
+ return list;
+ };
+
+ // Empty page.
+ checkCompressDecompress(page, getContents, false);
+
+ int cnt = io.getMaxCount(pageAddr, pageSize);
+
+ for (int i = 0; i < cnt; i++) {
+ byte[] row = rows[rnd.nextInt(rows.length)];
+ io.insert(pageAddr, i, row, row, 777_000 + i, false);
+ }
+
+ if (io.isLeaf())
+ assertEquals(pageSize, io.getItemsEnd(pageAddr)); // Page must be full.
+
+ // Full page.
+ checkCompressDecompress(page, getContents, io.isLeaf());
+
+ io.setCount(pageAddr, cnt / 2);
+
+ // Half page.
+ checkCompressDecompress(page, getContents, false);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doTestDataPage() throws IgniteCheckedException {
+ Random rnd = ThreadLocalRandom.current();
+
+ final byte[][] rows = new byte[][]{
+ new byte[17], new byte[37], new byte[71]
+ };
+
+ for (int i = 0; i < rows.length; i++)
+ rnd.nextBytes(rows[i]);
+
+ ByteBuffer page = allocateDirectBuffer(pageSize);
+ long pageAddr = bufferAddress(page);
+
+ SimpleDataPageIO io = SimpleDataPageIO.VERSIONS.latest();
+
+ long pageId = PageIdUtils.pageId(PageIdAllocator.MAX_PARTITION_ID, PageIdAllocator.FLAG_DATA, 171717);
+
+ io.initNewPage(pageAddr, pageId, pageSize);
+
+ checkIo(io, page);
+
+ Function<ByteBuffer,List<Bytes>> getContents = (buf) -> {
+ try {
+ long addr = bufferAddress(buf);
+
+ return io.forAllItems(addr, (link) -> {
+ DataPagePayload payload = io.readPayload(addr, PageIdUtils.itemId(link), pageSize);
+
+ return new Bytes(payload.getBytes(addr));
+ });
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ };
+
+ // Empty data page.
+ checkCompressDecompress(page, getContents, false);
+
+ GridIntList itemIds = new GridIntList();
+
+ for (;;) {
+ byte[] row = rows[rnd.nextInt(rows.length)];
+
+ if (io.getFreeSpace(pageAddr) < row.length)
+ break;
+
+ itemIds.add(io.addRow(pageAddr, row, pageSize));
+ }
+
+ int freeSpace = io.getFreeSpace(pageAddr);
+
+ if (freeSpace != 0) {
+ byte[] lastRow = new byte[freeSpace];
+ rnd.nextBytes(lastRow);
+
+ io.addRowFragment(pageId, pageAddr, lastRow, 777L, pageSize);
+
+ assertEquals(0, io.getRealFreeSpace(pageAddr));
+ }
+
+ // Full data page.
+ checkCompressDecompress(page, getContents, io.getRealFreeSpace(pageAddr) == 0);
+
+ for (int i = 0; i < itemIds.size(); i += 2)
+ io.removeRow(pageAddr, itemIds.get(i), pageSize);
+
+ // Half-filled data page.
+ checkCompressDecompress(page, getContents, false);
+ }
+
+ private void checkIo(PageIO io, ByteBuffer page) throws IgniteCheckedException {
+ assertSame(io, PageIO.getPageIO(bufferAddress(page)));
+ assertSame(io, PageIO.getPageIO(page));
+ }
+
+ private void checkCompressDecompress(ByteBuffer page, Function<ByteBuffer, ?> getPageContents, boolean fullPage)
+ throws IgniteCheckedException {
+ PageIO.setCrc(page, 0xABCDEF13);
+
+ long pageId = PageIO.getPageId(page);
+ PageIO io = PageIO.getPageIO(page);
+
+ ByteBuffer compressed = p.compressPage(page, pageSize, blockSize, compression, compressLevel);
+
+ int compressedSize = PageIO.getCompressedSize(compressed);
+
+ assertNotSame(page, compressed); // This is generally possible but not interesting in this test.
+
+ assertTrue(compressedSize > 0);
+ assertTrue(compressedSize <= pageSize);
+ assertEquals(compressedSize, compressed.limit());
+
+ if (!fullPage || compression != SKIP_GARBAGE)
+ assertTrue(pageSize > compressedSize);
+
+ assertEquals(0, compressed.position());
+
+ checkIo(io, compressed);
+ assertEquals(0, page.position());
+ assertEquals(pageSize, page.limit());
+
+ info(io.getClass().getSimpleName() + " " + compression + " " + compressLevel + ": " + compressedSize + "/" + pageSize);
+
+ if (!fullPage || compression != SKIP_GARBAGE)
+ assertTrue(compressedSize < pageSize);
+
+ assertEquals(pageId, PageIO.getPageId(compressed));
+
+ ByteBuffer decompress = allocateDirectBuffer(pageSize);
+ decompress.put(compressed).clear();
+
+ p.decompressPage(decompress, pageSize);
+
+ assertEquals(0, decompress.position());
+ assertEquals(pageSize, decompress.limit());
+
+ checkIo(io, decompress);
+ assertEquals(UNCOMPRESSED_PAGE, PageIO.getCompressionType(page));
+ assertEquals(0, PageIO.getCompressedSize(page));
+ assertEquals(0, PageIO.getCompactedSize(page));
+
+ assertTrue(Arrays.equals(getPageCommonHeader(page), getPageCommonHeader(decompress)));
+ assertEquals(getPageContents.apply(page), getPageContents.apply(decompress));
+ }
+
+ /**
+ * @param page Page.
+ * @return Page header.
+ */
+ private static byte[] getPageCommonHeader(ByteBuffer page) {
+ return PageUtils.getBytes(GridUnsafe.bufferAddress(page), 0, PageIO.COMMON_HEADER_END);
+ }
+
+ /**
+ */
+ private static class Bytes {
+ /** */
+ private final byte[] bytes;
+
+ /**
+ * @param bytes Bytes.
+ */
+ private Bytes(byte[] bytes) {
+ assert bytes != null;
+ this.bytes = bytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Bytes bytes1 = (Bytes)o;
+
+ return Arrays.equals(bytes, bytes1.bytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Arrays.hashCode(bytes);
+ }
+ }
+
+ /**
+ */
+ static class TestLeafIO extends BPlusLeafIO<byte[]> {
+ /** */
+ static final TestLeafIO LEAF_IO = new TestLeafIO();
+
+ /**
+ */
+ TestLeafIO() {
+ super(29_501, 1, ITEM_SIZE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, byte[] row) {
+ PageUtils.putBytes(pageAddr, off, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<byte[]> srcIo, long srcPageAddr,
+ int srcIdx) throws IgniteCheckedException {
+ storeByOffset(dstPageAddr, offset(dstIdx), srcIo.getLookupRow(null, srcPageAddr, srcIdx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] getLookupRow(BPlusTree<byte[],?> tree, long pageAddr, int idx) {
+ return PageUtils.getBytes(pageAddr, offset(idx), itemSize);
+ }
+ }
+
+ /**
+ */
+ static class TestInnerIO extends BPlusInnerIO<byte[]> {
+ /** */
+ static TestInnerIO INNER_IO = new TestInnerIO();
+
+ /**
+ */
+ TestInnerIO() {
+ super(29_502, 1, true, ITEM_SIZE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, byte[] row) {
+ PageUtils.putBytes(pageAddr, off, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<byte[]> srcIo, long srcPageAddr,
+ int srcIdx) throws IgniteCheckedException {
+ storeByOffset(dstPageAddr, offset(dstIdx), srcIo.getLookupRow(null, srcPageAddr, srcIdx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] getLookupRow(BPlusTree<byte[],?> tree, long pageAddr, int idx) {
+ return PageUtils.getBytes(pageAddr, offset(idx), itemSize);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java
new file mode 100644
index 0000000..658a5d2
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+
+/**
+ */
+public class DiskPageCompressionIntegrationAsyncTest extends DiskPageCompressionIntegrationTest {
+ /** {@inheritDoc} */
+ @Override protected FileIOFactory getFileIOFactory() {
+ return new AsyncFileIOFactory();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
new file mode 100644
index 0000000..ca7f4ea
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
+import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
+import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL;
+
+/**
+ *
+ */
+public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
+ /** */
+ private DiskPageCompression compression;
+
+ /** */
+ private Integer compressionLevel;
+
+ /** */
+ private FileIOFactory factory;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ compression = null;
+ compressionLevel = null;
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+ DataRegionConfiguration drCfg = new DataRegionConfiguration()
+ .setPersistenceEnabled(true);
+
+ factory = getFileIOFactory();
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setMetricsEnabled(true)
+ .setPageSize(MAX_PAGE_SIZE)
+ .setDefaultDataRegionConfiguration(drCfg)
+ .setFileIOFactory(U.isLinux() ? factory : new PunchFileIOFactory(factory));
+
+ return super.getConfiguration(igniteName).setDataStorageConfiguration(dsCfg);
+ }
+
+ /**
+ * @return File IO factory.
+ */
+ protected FileIOFactory getFileIOFactory() {
+ return new RandomAccessFileIOFactory();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_Zstd_Max() throws Exception {
+ compression = ZSTD;
+ compressionLevel = ZSTD_MAX_LEVEL;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_Zstd_Default() throws Exception {
+ compression = ZSTD;
+ compressionLevel = null;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_Zstd_Min() throws Exception {
+ compression = ZSTD;
+ compressionLevel = ZSTD_MIN_LEVEL;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_Lz4_Max() throws Exception {
+ compression = LZ4;
+ compressionLevel = LZ4_MAX_LEVEL;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_Lz4_Default() throws Exception {
+ compression = LZ4;
+ compressionLevel = null;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_Lz4_Min() throws Exception {
+ assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_SkipGarbage() throws Exception {
+ compression = SKIP_GARBAGE;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPageCompression_Snappy() throws Exception {
+ compression = SNAPPY;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTestPageCompression() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ String cacheName = "test";
+
+ CacheConfiguration<Integer,TestVal> ccfg = new CacheConfiguration<Integer,TestVal>()
+ .setName(cacheName)
+ .setBackups(0)
+ .setAtomicityMode(ATOMIC)
+ .setIndexedTypes(Integer.class, TestVal.class)
+ .setDiskPageCompression(compression)
+ .setDiskPageCompressionLevel(compressionLevel);
+
+ IgniteCache<Integer,TestVal> cache = ignite.getOrCreateCache(ccfg);
+
+ int cnt = 2_000;
+
+ for (int i = 0; i < cnt; i++)
+ assertTrue(cache.putIfAbsent(i, new TestVal(i)));
+
+ for (int i = 0; i < cnt; i += 2)
+ assertEquals(new TestVal(i), cache.getAndRemove(i));
+
+ GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context()
+ .cache().context().database());
+
+ dbMgr.forceCheckpoint("test compression").finishFuture().get();
+
+ FilePageStoreManager storeMgr = dbMgr.getFileStoreManager();
+
+ checkFileIOFactory(storeMgr.getPageStoreFileIoFactory());
+
+ Thread.sleep(100); // Wait for metrics update.
+
+ long storeSize = ignite.dataStorageMetrics().getStorageSize();
+ long sparseStoreSize = ignite.dataStorageMetrics().getSparseStorageSize();
+
+ assertTrue("storeSize: " + storeSize, storeSize > 0);
+
+ if (U.isLinux()) {
+ assertTrue("sparseSize: " + sparseStoreSize, sparseStoreSize > 0);
+ assertTrue(storeSize + " > " + sparseStoreSize, storeSize > sparseStoreSize);
+ }
+ else
+ assertTrue(sparseStoreSize < 0);
+
+ GridCacheContext<?,?> cctx = ignite.cachex(cacheName).context();
+
+ int cacheId = cctx.cacheId();
+ int groupId = cctx.groupId();
+
+ assertEquals(cacheId, groupId);
+
+ CacheGroupMetricsMXBean mx = cctx.group().mxBean();
+
+ storeSize = mx.getStorageSize();
+ sparseStoreSize = mx.getSparseStorageSize();
+
+ assertTrue("storeSize: " + storeSize, storeSize > 0);
+
+ if (U.isLinux()) {
+ assertTrue("sparseSize: " + sparseStoreSize, sparseStoreSize > 0);
+ assertTrue(storeSize + " > " + sparseStoreSize, storeSize > sparseStoreSize);
+ }
+ else
+ assertTrue(sparseStoreSize < 0);
+
+ int parts = cctx.affinity().partitions();
+
+ for (int i = 0; i < parts; i++) {
+ PageStore store = storeMgr.getStore(cacheId, i);
+
+ long realSize = store.size();
+ long virtualSize = store.getPageSize() * store.pages();
+ long sparseSize = store.getSparseSize();
+
+ assertTrue(virtualSize > 0);
+
+ error("virt: " + virtualSize + ", real: " + realSize + ", sparse: " + sparseSize);
+
+ if (!store.exists())
+ continue;
+
+ if (virtualSize > sparseSize)
+ return;
+ }
+
+ fail("No files were compacted.");
+ }
+
+ /**
+ */
+ public void _testCompressionRatio() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ String cacheName = "test";
+
+ CacheConfiguration<Integer,TestVal> ccfg = new CacheConfiguration<Integer,TestVal>()
+ .setName(cacheName)
+ .setBackups(0)
+ .setAtomicityMode(ATOMIC)
+ .setIndexedTypes(Integer.class, TestVal.class)
+ .setAffinity(new RendezvousAffinityFunction().setPartitions(10))
+ .setDiskPageCompression(ZSTD);
+// .setDiskPageCompressionLevel(compressionLevel);
+
+ ignite.getOrCreateCache(ccfg);
+
+ IgniteInternalCache<Integer,TestVal> cache = ignite.cachex(cacheName);
+
+ CacheGroupMetricsMXBean mx = cache.context().group().mxBean();
+
+ GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context()
+ .cache().context().database());
+
+ int cnt = 20_000_000;
+
+ for (int i = 0; i < cnt; i++) {
+ assertTrue(cache.putIfAbsent(i, new TestVal(i)));
+
+ if (i % 50_000 == 0) {
+ dbMgr.forceCheckpoint("test").finishFuture().get();
+
+ long sparse = mx.getSparseStorageSize();
+ long size = mx.getStorageSize();
+
+ System.out.println(i + " >> " + sparse + " / " + size + " = " + ((double)sparse / size));
+ }
+ }
+ }
+
+ /**
+ * @param f Factory.
+ */
+ protected void checkFileIOFactory(FileIOFactory f) {
+ if (!U.isLinux())
+ f = ((PunchFileIOFactory)f).delegate;
+
+ assertSame(factory, f);
+ }
+
+ /**
+ */
+ static class TestVal implements Serializable {
+ /** */
+ static final long serialVersionUID = 1L;
+
+ /** */
+ @QuerySqlField
+ String str;
+
+ /** */
+ int i;
+
+ /** */
+ @QuerySqlField
+ long x;
+
+ /** */
+ @QuerySqlField
+ UUID id;
+
+ TestVal(int i) {
+ this.str = i + "bla bla bla!";
+ this.i = -i;
+ this.x = 0xffaabbccdd773311L + i;
+ this.id = new UUID(i,-i);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TestVal testVal = (TestVal)o;
+
+ if (i != testVal.i) return false;
+ if (x != testVal.x) return false;
+ if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false;
+ return id != null ? id.equals(testVal.id) : testVal.id == null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = str != null ? str.hashCode() : 0;
+ result = 31 * result + i;
+ result = 31 * result + (int)(x ^ (x >>> 32));
+ result = 31 * result + (id != null ? id.hashCode() : 0);
+ return result;
+ }
+ }
+
+ /**
+ */
+ static class PunchFileIO extends FileIODecorator {
+ /** */
+ private ConcurrentMap<Long, Integer> holes = new ConcurrentHashMap<>();
+
+ /**
+ * @param delegate File I/O delegate
+ */
+ public PunchFileIO(FileIO delegate) {
+ super(Objects.requireNonNull(delegate));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ assertFalse(U.isLinux());
+
+ return 4 * 1024;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ assertFalse(U.isLinux());
+
+ long holesSize = holes.values().stream().mapToLong(x -> x).sum();
+
+ try {
+ return size() - holesSize;
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int writeFully(ByteBuffer srcBuf, long position) throws IOException {
+ assertFalse(U.isLinux());
+
+ holes.remove(position);
+ return super.writeFully(srcBuf, position);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long pos, int len) {
+ assertFalse(U.isLinux());
+
+ assertTrue(len > 0);
+
+ int blockSize = getFileSystemBlockSize();
+
+ len = len / blockSize * blockSize;
+
+ if (len > 0)
+ holes.put(pos, len);
+
+ return len;
+ }
+ }
+
+ /**
+ */
+ static class PunchFileIOFactory implements FileIOFactory {
+ /** */
+ final FileIOFactory delegate;
+
+ /**
+ * @param delegate Delegate.
+ */
+ PunchFileIOFactory(FileIOFactory delegate) {
+ this.delegate = Objects.requireNonNull(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file) throws IOException {
+ return new PunchFileIO(delegate.create(file));
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+ return new PunchFileIO(delegate.create(file, modes));
+ }
+ }
+}