You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/04/30 10:15:28 UTC
[ignite] 12/17: GG-17480 Implement WAL page snapshot records
compression - Fixes #6116.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-18539
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit b1a355a2b4df6082da4d93a11d9253eacd9af036
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Apr 29 13:21:49 2019 +0300
GG-17480 Implement WAL page snapshot records compression - Fixes #6116.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
(cherry picked from commit 2ad6c85)
---
modules/compress/pom.xml | 15 ++
.../compress/CompressionProcessorImpl.java | 100 +++++++---
...ionWithRealCpDisabledAndWalCompressionTest.java | 37 ++++
.../wal/WalCompactionAndPageCompressionTest.java | 35 ++++
.../WalRecoveryWithPageCompressionAndTdeTest.java | 51 ++++++
.../db/wal/WalRecoveryWithPageCompressionTest.java | 36 ++++
.../AbstractPageCompressionIntegrationTest.java | 201 +++++++++++++++++++++
.../DiskPageCompressionIntegrationTest.java | 171 +-----------------
.../WalPageCompressionIntegrationTest.java | 101 +++++++++++
.../testsuites/IgnitePdsCompressionTestSuite.java | 12 ++
.../ignite/configuration/CacheConfiguration.java | 5 +-
.../configuration/DataStorageConfiguration.java | 53 ++++++
.../ignite/configuration/DiskPageCompression.java | 5 +
.../internal/pagemem/wal/record/PageSnapshot.java | 76 ++++++--
.../processors/cache/CacheCompressionManager.java | 10 +-
.../GridCacheDatabaseSharedManager.java | 9 +
.../cache/persistence/file/FilePageStore.java | 4 +
.../cache/persistence/pagemem/PageMemoryImpl.java | 11 +-
.../cache/persistence/tree/io/PageIO.java | 38 +++-
.../persistence/wal/FileWriteAheadLogManager.java | 43 +++++
.../wal/serializer/RecordDataSerializer.java | 3 +-
.../wal/serializer/RecordDataV1Serializer.java | 17 +-
.../wal/serializer/RecordDataV2Serializer.java | 17 +-
.../wal/serializer/RecordV1Serializer.java | 2 +-
.../wal/serializer/RecordV2Serializer.java | 3 +-
.../processors/compress/CompressionProcessor.java | 11 +-
.../platform/utils/PlatformConfigurationUtils.java | 15 ++
.../processors/cache/persistence/DummyPageIO.java | 4 +
...CheckpointSimulationWithRealCpDisabledTest.java | 25 ++-
.../persistence/db/wal/WalCompactionTest.java | 26 ++-
.../wal/memtracker/PageMemoryTracker.java | 3 +-
.../persistence/db/wal/IgniteWalRecoveryTest.java | 71 ++++++--
.../Config/full-config.xml | 3 +-
.../IgniteConfigurationSerializerTest.cs | 3 +
.../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 1 +
.../Configuration/DataStorageConfiguration.cs | 22 ++-
.../Configuration/DiskPageCompression.cs | 50 +++++
.../IgniteConfigurationSection.xsd | 20 ++
38 files changed, 1049 insertions(+), 260 deletions(-)
diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml
index a7f77de..bdcd924 100644
--- a/modules/compress/pom.xml
+++ b/modules/compress/pom.xml
@@ -42,6 +42,21 @@
</dependency>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <scope>test</scope>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>${jnr.posix.version}</version>
diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
index a89f67a..6d1b95f 100644
--- a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
+++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
@@ -27,8 +27,10 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.xerial.snappy.Snappy;
@@ -63,6 +65,11 @@ public class CompressionProcessorImpl extends CompressionProcessor {
}
/** {@inheritDoc} */
+ @Override public void checkPageCompressionSupported() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException {
if (!U.isLinux())
throw new IgniteCheckedException("Currently page compression is supported only for Linux.");
@@ -92,46 +99,74 @@ public class CompressionProcessorImpl extends CompressionProcessor {
DiskPageCompression compression,
int compressLevel
) throws IgniteCheckedException {
- assert compression != null;
- assert U.isPow2(pageSize): pageSize;
+ assert compression != null && compression != DiskPageCompression.DISABLED : compression;
assert U.isPow2(blockSize): blockSize;
- assert page.position() == 0 && page.limit() == pageSize;
+ assert page.position() == 0 && page.limit() >= pageSize;
- PageIO io = PageIO.getPageIO(page);
+ int oldPageLimit = page.limit();
- if (!(io instanceof CompactablePageIO))
- return page;
+ try {
+ // Page size will be less than page limit when TDE is enabled. To make compaction and compression work
+ // correctly we need to set limit to real page size.
+ page.limit(pageSize);
- ByteBuffer compactPage = compactBuf.get();
+ ByteBuffer compactPage = doCompactPage(page, pageSize);
- // Drop the garbage from the page.
- ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
- page.clear();
+ int compactSize = compactPage.limit();
- int compactSize = compactPage.limit();
+ assert compactSize <= pageSize : compactSize;
- assert compactSize <= pageSize: compactSize;
+ // If no need to compress further or configured just to skip garbage.
+ if (compactSize < blockSize || compression == SKIP_GARBAGE)
+ return setCompactionInfo(compactPage, compactSize);
+
+ ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);
+
+ assert compressedPage.position() == 0;
+ int compressedSize = compressedPage.limit();
+
+ int freeCompactBlocks = (pageSize - compactSize) / blockSize;
+ int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+
+ if (freeCompactBlocks >= freeCompressedBlocks) {
+ if (freeCompactBlocks == 0)
+ return page; // No blocks will be released.
+
+ return setCompactionInfo(compactPage, compactSize);
+ }
- // If no need to compress further or configured just to skip garbage.
- if (compactSize < blockSize || compression == SKIP_GARBAGE)
- return setCompactionInfo(compactPage, compactSize);
+ return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
+ }
+ finally {
+ page.limit(oldPageLimit);
+ }
+ }
- ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel);
+ /**
+ * @param page Page buffer.
+ * @param pageSize Page size.
+ * @return Compacted page buffer.
+ */
+ private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
+ PageIO io = PageIO.getPageIO(page);
- assert compressedPage.position() == 0;
- int compressedSize = compressedPage.limit();
+ ByteBuffer compactPage = compactBuf.get();
- int freeCompactBlocks = (pageSize - compactSize) / blockSize;
- int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+ if (io instanceof CompactablePageIO) {
+ // Drop the garbage from the page.
+ ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
+ }
+ else {
+ // Direct buffer is required as output of this method.
+ if (page.isDirect())
+ return page;
- if (freeCompactBlocks >= freeCompressedBlocks) {
- if (freeCompactBlocks == 0)
- return page; // No blocks will be released.
+ PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array());
- return setCompactionInfo(compactPage, compactSize);
+ compactPage.limit(pageSize);
}
- return setCompressionInfo(compressedPage, compression, compressedSize, compactSize);
+ return compactPage;
}
/**
@@ -260,7 +295,7 @@ public class CompressionProcessorImpl extends CompressionProcessor {
* @return Level.
*/
private static byte getCompressionType(DiskPageCompression compression) {
- if (compression == null)
+ if (compression == DiskPageCompression.DISABLED)
return UNCOMPRESSED_PAGE;
switch (compression) {
@@ -281,7 +316,7 @@ public class CompressionProcessorImpl extends CompressionProcessor {
/** {@inheritDoc} */
@Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
- assert page.capacity() == pageSize;
+ assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize;
byte compressType = PageIO.getCompressionType(page);
@@ -338,11 +373,16 @@ public class CompressionProcessorImpl extends CompressionProcessor {
assert page.limit() == compactSize;
}
- CompactablePageIO io = PageIO.getPageIO(page);
+ PageIO io = PageIO.getPageIO(page);
- io.restorePage(page, pageSize);
+ if (io instanceof CompactablePageIO)
+ ((CompactablePageIO)io).restorePage(page, pageSize);
+ else {
+ assert compactSize == pageSize
+ : "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']';
+ }
- setCompressionInfo(page, null, 0, 0);
+ setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
}
/** */
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java
new file mode 100644
index 0000000..48416b4
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
+
+/**
+ *
+ */
+public class IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest
+ extends IgnitePdsCheckpointSimulationWithRealCpDisabledTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getDataStorageConfiguration().setWalPageCompression(DiskPageCompression.SNAPPY);
+
+ return cfg;
+ }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java
new file mode 100644
index 0000000..01e2040
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class WalCompactionAndPageCompressionTest extends WalCompactionTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.getDataStorageConfiguration().setWalPageCompression(DiskPageCompression.SNAPPY);
+
+ return cfg;
+ }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java
new file mode 100644
index 0000000..07c1312
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.encryption.AbstractEncryptionTest;
+import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi;
+
+/**
+ *
+ */
+public class WalRecoveryWithPageCompressionAndTdeTest extends WalRecoveryWithPageCompressionTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ for (CacheConfiguration<?, ?> ccfg : cfg.getCacheConfiguration())
+ ccfg.setEncryptionEnabled(true);
+
+ KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi();
+
+ encSpi.setKeyStorePath(AbstractEncryptionTest.KEYSTORE_PATH);
+ encSpi.setKeyStorePassword(AbstractEncryptionTest.KEYSTORE_PASSWORD.toCharArray());
+
+ cfg.setEncryptionSpi(encSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testWalRenameDirSimple() throws Exception {
+ // Ignore this test when TDE is enabled, since there is internal cache group id change without corresponding
+ // encryption keys change.
+ }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java
new file mode 100644
index 0000000..aaf4b25
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION;
+
+/**
+ * WAL recovery test with WAL page compression enabled and PDS disk page compression disabled.
+ */
+@WithSystemProperty(key = IGNITE_DEFAULT_DISK_PAGE_COMPRESSION, value = "DISABLED")
+public class WalRecoveryWithPageCompressionTest extends IgniteWalRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ walPageCompression = DiskPageCompression.ZSTD;
+ }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java
new file mode 100644
index 0000000..31d838d
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
+import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL;
+
+/**
+ *
+ */
+public abstract class AbstractPageCompressionIntegrationTest extends GridCommonAbstractTest {
+ /** */
+ protected DiskPageCompression compression;
+
+ /** */
+ protected Integer compressionLevel;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ compression = DiskPageCompression.DISABLED;
+ compressionLevel = null;
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_Zstd_Max() throws Exception {
+ compression = ZSTD;
+ compressionLevel = ZSTD_MAX_LEVEL;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_Zstd_Default() throws Exception {
+ compression = ZSTD;
+ compressionLevel = null;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_Zstd_Min() throws Exception {
+ compression = ZSTD;
+ compressionLevel = ZSTD_MIN_LEVEL;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_Lz4_Max() throws Exception {
+ compression = LZ4;
+ compressionLevel = LZ4_MAX_LEVEL;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_Lz4_Default() throws Exception {
+ compression = LZ4;
+ compressionLevel = null;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_Lz4_Min() throws Exception {
+ assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_SkipGarbage() throws Exception {
+ compression = SKIP_GARBAGE;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPageCompression_Snappy() throws Exception {
+ compression = SNAPPY;
+
+ doTestPageCompression();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ protected abstract void doTestPageCompression() throws Exception;
+
+ /**
+ *
+ */
+ static class TestVal implements Serializable {
+ /** */
+ static final long serialVersionUID = 1L;
+
+ /** */
+ @QuerySqlField
+ String str;
+
+ /** */
+ int i;
+
+ /** */
+ @QuerySqlField
+ long x;
+
+ /** */
+ @QuerySqlField
+ UUID id;
+
+ TestVal(int i) {
+ this.str = i + "bla bla bla!";
+ this.i = -i;
+ this.x = 0xffaabbccdd773311L + i;
+ this.id = new UUID(i,-i);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TestVal testVal = (TestVal)o;
+
+ if (i != testVal.i) return false;
+ if (x != testVal.x) return false;
+ if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false;
+ return id != null ? id.equals(testVal.id) : testVal.id == null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = str != null ? str.hashCode() : 0;
+ result = 31 * result + i;
+ result = 31 * result + (int)(x ^ (x >>> 32));
+ result = 31 * result + (id != null ? id.hashCode() : 0);
+ return result;
+ }
+ }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
index 0158828..865ab22 100644
--- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java
@@ -18,21 +18,17 @@ package org.apache.ignite.internal.processors.compress;
import java.io.File;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.OpenOption;
import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.store.PageStore;
@@ -46,47 +42,19 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
-import static org.apache.ignite.configuration.DiskPageCompression.LZ4;
-import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
-import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY;
import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL;
-import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL;
/**
*
*/
-public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
- /** */
- private DiskPageCompression compression;
-
- /** */
- private Integer compressionLevel;
-
+public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionIntegrationTest {
/** */
private FileIOFactory factory;
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- compression = null;
- compressionLevel = null;
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids(true);
- }
-
- /** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
DataRegionConfiguration drCfg = new DataRegionConfiguration()
.setPersistenceEnabled(true);
@@ -112,90 +80,8 @@ public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- @Test
- public void testPageCompression_Zstd_Max() throws Exception {
- compression = ZSTD;
- compressionLevel = ZSTD_MAX_LEVEL;
-
- doTestPageCompression();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testPageCompression_Zstd_Default() throws Exception {
- compression = ZSTD;
- compressionLevel = null;
-
- doTestPageCompression();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testPageCompression_Zstd_Min() throws Exception {
- compression = ZSTD;
- compressionLevel = ZSTD_MIN_LEVEL;
-
- doTestPageCompression();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testPageCompression_Lz4_Max() throws Exception {
- compression = LZ4;
- compressionLevel = LZ4_MAX_LEVEL;
-
- doTestPageCompression();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testPageCompression_Lz4_Default() throws Exception {
- compression = LZ4;
- compressionLevel = null;
-
- doTestPageCompression();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testPageCompression_Lz4_Min() throws Exception {
- assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL);
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testPageCompression_SkipGarbage() throws Exception {
- compression = SKIP_GARBAGE;
-
- doTestPageCompression();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testPageCompression_Snappy() throws Exception {
- compression = SNAPPY;
-
- doTestPageCompression();
- }
-
- /**
- * @throws Exception If failed.
- */
- private void doTestPageCompression() throws Exception {
+ @Override
+ protected void doTestPageCompression() throws Exception {
IgniteEx ignite = startGrid(0);
ignite.cluster().active(true);
@@ -342,57 +228,6 @@ public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest {
/**
*/
- static class TestVal implements Serializable {
- /** */
- static final long serialVersionUID = 1L;
-
- /** */
- @QuerySqlField
- String str;
-
- /** */
- int i;
-
- /** */
- @QuerySqlField
- long x;
-
- /** */
- @QuerySqlField
- UUID id;
-
- TestVal(int i) {
- this.str = i + "bla bla bla!";
- this.i = -i;
- this.x = 0xffaabbccdd773311L + i;
- this.id = new UUID(i,-i);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TestVal testVal = (TestVal)o;
-
- if (i != testVal.i) return false;
- if (x != testVal.x) return false;
- if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false;
- return id != null ? id.equals(testVal.id) : testVal.id == null;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = str != null ? str.hashCode() : 0;
- result = 31 * result + i;
- result = 31 * result + (int)(x ^ (x >>> 32));
- result = 31 * result + (id != null ? id.hashCode() : 0);
- return result;
- }
- }
-
- /**
- */
static class PunchFileIO extends FileIODecorator {
/** */
private ConcurrentMap<Long, Integer> holes = new ConcurrentHashMap<>();
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java
new file mode 100644
index 0000000..c16f9e4
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class WalPageCompressionIntegrationTest extends AbstractPageCompressionIntegrationTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception {
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+ .setWalPageCompression(compression)
+ .setWalPageCompressionLevel(compressionLevel);
+
+ return super.getConfiguration(igniteName)
+ .setDataStorageConfiguration(dsCfg)
+ // Set new IP finder for each node to start independent clusters.
+ .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder(true)));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override
+ protected void doTestPageCompression() throws Exception {
+ // Ignite instance with compressed WAL page records.
+ IgniteEx ignite0 = startGrid(0);
+
+ compression = DiskPageCompression.DISABLED;
+ compressionLevel = null;
+
+ // Reference ignite instance with uncompressed WAL page records.
+ IgniteEx ignite1 = startGrid(1);
+
+ ignite0.cluster().active(true);
+ ignite1.cluster().active(true);
+
+ String cacheName = "test";
+
+ CacheConfiguration<Integer, TestVal> ccfg = new CacheConfiguration<Integer, TestVal>()
+ .setName(cacheName)
+ .setBackups(0)
+ .setAtomicityMode(ATOMIC)
+ .setIndexedTypes(Integer.class, TestVal.class);
+
+ IgniteCache<Integer,TestVal> cache0 = ignite0.getOrCreateCache(ccfg);
+ IgniteCache<Integer,TestVal> cache1 = ignite1.getOrCreateCache(ccfg);
+
+ int cnt = 20_000;
+
+ for (int i = 0; i < cnt; i++) {
+ assertTrue(cache0.putIfAbsent(i, new TestVal(i)));
+ assertTrue(cache1.putIfAbsent(i, new TestVal(i)));
+ }
+
+ for (int i = 0; i < cnt; i += 2) {
+ assertEquals(new TestVal(i), cache0.getAndRemove(i));
+ assertEquals(new TestVal(i), cache1.getAndRemove(i));
+ }
+
+ // Write any WAL record to get current WAL pointers.
+ FileWALPointer ptr0 = (FileWALPointer)ignite0.context().cache().context().wal().log(new CheckpointRecord(null));
+ FileWALPointer ptr1 = (FileWALPointer)ignite1.context().cache().context().wal().log(new CheckpointRecord(null));
+
+ log.info("Compressed WAL pointer: " + ptr0);
+ log.info("Uncompressed WAL pointer: " + ptr1);
+
+ assertTrue("Compressed WAL must be smaller than uncompressed [ptr0=" + ptr0 + ", ptr1=" + ptr1 + ']',
+ ptr0.compareTo(ptr1) < 0);
+ }
+}
diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
index b38abc0..efbdd14 100644
--- a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
+++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
@@ -18,11 +18,16 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.List;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionAndPageCompressionTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionAndTdeTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionTest;
import org.apache.ignite.internal.processors.compress.CompressionConfigurationTest;
import org.apache.ignite.internal.processors.compress.CompressionProcessorTest;
import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationAsyncTest;
import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationTest;
import org.apache.ignite.internal.processors.compress.FileSystemUtilsTest;
+import org.apache.ignite.internal.processors.compress.WalPageCompressionIntegrationTest;
import org.apache.ignite.testframework.junits.DynamicSuite;
import org.junit.runner.RunWith;
@@ -45,6 +50,13 @@ public class IgnitePdsCompressionTestSuite {
suite.add(DiskPageCompressionIntegrationTest.class);
suite.add(DiskPageCompressionIntegrationAsyncTest.class);
+ // WAL page records compression.
+ suite.add(WalPageCompressionIntegrationTest.class);
+ suite.add(WalRecoveryWithPageCompressionTest.class);
+ suite.add(WalRecoveryWithPageCompressionAndTdeTest.class);
+ suite.add(IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.class);
+ suite.add(WalCompactionAndPageCompressionTest.class);
+
enableCompressionByDefault();
IgnitePdsTestSuite.addRealPageStoreTests(suite, null);
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 800b41c..1df064e 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -190,6 +190,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default SQL on-heap cache size. */
public static final int DFLT_SQL_ONHEAP_CACHE_MAX_SIZE = 0;
+ /** Default disk page compression algorithm. */
+ public static final DiskPageCompression DFLT_DISK_PAGE_COMPRESSION = DiskPageCompression.DISABLED;
+
/** Cache name. */
private String name;
@@ -2318,7 +2321,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* @see #getDiskPageCompressionLevel
*/
public DiskPageCompression getDiskPageCompression() {
- return diskPageCompression;
+ return diskPageCompression == null ? DFLT_DISK_PAGE_COMPRESSION : diskPageCompression;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index c2cb8bc..d10bda1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -166,6 +166,9 @@ public class DataStorageConfiguration implements Serializable {
/** Default wal compaction level. */
public static final int DFLT_WAL_COMPACTION_LEVEL = Deflater.BEST_SPEED;
+ /** Default compression algorithm for WAL page snapshot records. */
+ public static final DiskPageCompression DFLT_WAL_PAGE_COMPRESSION = DiskPageCompression.DISABLED;
+
/** Initial size of a memory chunk reserved for system cache. */
private long sysRegionInitSize = DFLT_SYS_REG_INIT_SIZE;
@@ -289,6 +292,12 @@ public class DataStorageConfiguration implements Serializable {
/** Timeout for checkpoint read lock acquisition. */
private Long checkpointReadLockTimeout;
+ /** Compression algorithm for WAL page snapshot records. */
+ private DiskPageCompression walPageCompression = DFLT_WAL_PAGE_COMPRESSION;
+
+ /** Compression level for WAL page snapshot records. */
+ private Integer walPageCompressionLevel;
+
/**
* Initial size of a data region reserved for system cache.
*
@@ -1020,6 +1029,50 @@ public class DataStorageConfiguration implements Serializable {
return this;
}
+ /**
+ * Gets compression algorithm for WAL page snapshot records.
+ *
+ * @return Page compression algorithm.
+ */
+ public DiskPageCompression getWalPageCompression() {
+ return walPageCompression == null ? DFLT_WAL_PAGE_COMPRESSION : walPageCompression;
+ }
+
+ /**
+ * Sets compression algorithm for WAL page snapshot records.
+ *
+ * @param walPageCompression Page compression algorithm.
+ * @return {@code this} for chaining.
+ */
+ public DataStorageConfiguration setWalPageCompression(DiskPageCompression walPageCompression) {
+ this.walPageCompression = walPageCompression;
+
+ return this;
+ }
+
+ /**
+ * Gets {@link #getWalPageCompression algorithm} specific WAL page compression level.
+ *
+ * @return WAL page snapshots compression level or {@code null} for default.
+ */
+ public Integer getWalPageCompressionLevel() {
+ return walPageCompressionLevel;
+ }
+
+ /**
+ * Sets {@link #setWalPageCompression algorithm} specific page compression level.
+ *
+ * @param walPageCompressionLevel Disk page compression level or {@code null} to use default.
+ * {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}).
+ * {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}).
+ * @return {@code this} for chaining.
+ */
+ public DataStorageConfiguration setWalPageCompressionLevel(Integer walPageCompressionLevel) {
+ this.walPageCompressionLevel = walPageCompressionLevel;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
index 5deda2f..29186f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
@@ -23,8 +23,13 @@ import org.jetbrains.annotations.Nullable;
*
* @see CacheConfiguration#setDiskPageCompression
* @see CacheConfiguration#setDiskPageCompressionLevel
+ * @see DataStorageConfiguration#setWalPageCompression
+ * @see DataStorageConfiguration#setWalPageCompressionLevel
*/
public enum DiskPageCompression {
+ /** Compression disabled. */
+ DISABLED,
+
/** Retain only useful data from half-filled pages, but do not apply any compression. */
SKIP_GARBAGE,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
index 6ae2c92..feae9d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
@@ -28,16 +28,16 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
/**
*
*/
-public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
+public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware {
/** */
@GridToStringExclude
- private byte[] pageData;
+ private ByteBuffer pageData;
/** */
private FullPageId fullPageId;
/**
- * PageSIze without encryption overhead.
+ * PageSize without encryption overhead.
*/
private int realPageSize;
@@ -48,13 +48,16 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
*/
public PageSnapshot(FullPageId fullId, byte[] arr, int realPageSize) {
this.fullPageId = fullId;
- this.pageData = arr;
+ this.pageData = ByteBuffer.wrap(arr).order(ByteOrder.nativeOrder());
this.realPageSize = realPageSize;
}
/**
+ * This constructor doesn't actually create a page snapshot (copy), it creates a wrapper over page memory region.
+ * A created record should not be used after WAL manager writes it to log, since page content can be modified.
+ *
* @param fullPageId Full page ID.
- * @param ptr Pointer to copy from.
+ * @param ptr Pointer to wrap.
* @param pageSize Page size.
* @param realPageSize Page size without encryption overhead.
*/
@@ -62,9 +65,7 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
this.fullPageId = fullPageId;
this.realPageSize = realPageSize;
- pageData = new byte[pageSize];
-
- GridUnsafe.copyMemory(null, ptr, pageData, GridUnsafe.BYTE_ARR_OFF, pageSize);
+ pageData = GridUnsafe.wrapPointer(ptr, pageSize);
}
/** {@inheritDoc} */
@@ -76,6 +77,31 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
* @return Snapshot of page data.
*/
public byte[] pageData() {
+ if (!pageData.isDirect())
+ return pageData.array();
+
+ // In case of direct buffer copy buffer content to new array.
+ byte[] arr = new byte[pageData.limit()];
+
+ GridUnsafe.copyMemory(null, GridUnsafe.bufferAddress(pageData), arr, GridUnsafe.BYTE_ARR_OFF,
+ pageData.limit());
+
+ return arr;
+ }
+
+ /**
+ * @return Size of page data.
+ */
+ public int pageDataSize() {
+ return pageData.limit();
+ }
+
+ /**
+ * @return Page data byte buffer.
+ */
+ public ByteBuffer pageDataBuffer() {
+ pageData.rewind();
+
return pageData;
}
@@ -87,10 +113,26 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
}
/** {@inheritDoc} */
+ @Override public int groupId() {
+ return fullPageId.groupId();
+ }
+
+ /**
+ * @return PageSize without encryption overhead.
+ */
+ public int realPageSize() {
+ return realPageSize;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
- ByteBuffer buf = ByteBuffer.allocateDirect(pageData.length);
- buf.order(ByteOrder.nativeOrder());
- buf.put(pageData);
+ ByteBuffer buf = pageData;
+
+ if (!pageData.isDirect()) {
+ buf = ByteBuffer.allocateDirect(pageDataSize());
+ buf.order(ByteOrder.nativeOrder());
+ buf.put(pageData);
+ }
long addr = GridUnsafe.bufferAddress(buf);
@@ -101,16 +143,12 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{
+ super.toString() + "]]";
}
catch (IgniteCheckedException ignored) {
- return "Error during call'toString' of PageSnapshot [fullPageId=" + fullPageId() +
- ", pageData = " + Arrays.toString(pageData) + ", super=" + super.toString() + "]";
+ return "Error during call 'toString' of PageSnapshot [fullPageId=" + fullPageId() +
+ ", pageData = " + Arrays.toString(pageData()) + ", super=" + super.toString() + "]";
}
finally {
- GridUnsafe.cleanDirectBuffer(buf);
+ if (!pageData.isDirect())
+ GridUnsafe.cleanDirectBuffer(buf);
}
}
-
- /** {@inheritDoc} */
- @Override public int groupId() {
- return fullPageId.groupId();
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
index a050cd0..cac0064 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
@@ -44,13 +44,19 @@ public class CacheCompressionManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
+ if (cctx.kernalContext().clientNode()) {
+ diskPageCompression = DiskPageCompression.DISABLED;
+
+ return;
+ }
+
compressProc = cctx.kernalContext().compress();
CacheConfiguration cfg = cctx.config();
diskPageCompression = cctx.kernalContext().config().isClientMode() ? null : cfg.getDiskPageCompression();
- if (diskPageCompression != null) {
+ if (diskPageCompression != DiskPageCompression.DISABLED) {
if (!cctx.dataRegion().config().isPersistenceEnabled())
throw new IgniteCheckedException("Disk page compression makes sense only with enabled persistence.");
@@ -81,7 +87,7 @@ public class CacheCompressionManager extends GridCacheManagerAdapter {
* @throws IgniteCheckedException If failed.
*/
public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException {
- if (diskPageCompression == null)
+ if (diskPageCompression == DiskPageCompression.DISABLED)
return page;
int blockSize = store.getBlockSize();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index d73b3da..a10877d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -151,6 +151,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
@@ -2529,6 +2530,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
try {
PageUtils.putBytes(pageAddr, 0, pageSnapshotRecord.pageData());
+
+ if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) {
+ int realPageSize = pageMem.realPageSize(pageSnapshotRecord.groupId());
+
+ assert pageSnapshotRecord.pageDataSize() < realPageSize : pageSnapshotRecord.pageDataSize();
+
+ cctx.kernalContext().compress().decompressPage(pageMem.pageBuffer(pageAddr), realPageSize);
+ }
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index ed7dd2f..23adeac 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -346,6 +346,10 @@ public class FilePageStore implements PageStore {
if (inited) {
long newSize = Math.max(pageSize, fileIO.size() - headerSize());
+ // In the case of compressed pages we can miss the tail of the page.
+ if (newSize % pageSize != 0)
+ newSize += pageSize - newSize % pageSize;
+
long delta = newSize - allocated.getAndSet(newSize);
assert delta % pageSize == 0 : delta;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index ed2d344..da787fb 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.stat.IoStatisticsHolder;
import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
@@ -967,7 +968,7 @@ public class PageMemoryImpl implements PageMemoryEx {
if (snapshot.fullPageId().equals(fullId)) {
if (tmpAddr == null) {
- assert snapshot.pageData().length <= pageSize() : snapshot.pageData().length;
+ assert snapshot.pageDataSize() <= pageSize() : snapshot.pageDataSize();
tmpAddr = GridUnsafe.allocateMemory(pageSize());
}
@@ -976,6 +977,14 @@ public class PageMemoryImpl implements PageMemoryEx {
curPage = wrapPointer(tmpAddr, pageSize());
PageUtils.putBytes(tmpAddr, 0, snapshot.pageData());
+
+ if (PageIO.getCompressionType(tmpAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) {
+ int realPageSize = realPageSize(snapshot.groupId());
+
+ assert snapshot.pageDataSize() < realPageSize : snapshot.pageDataSize();
+
+ ctx.kernalContext().compress().decompressPage(curPage, realPageSize);
+ }
}
break;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index 0e66b77..ab660d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -405,6 +405,14 @@ public abstract class PageIO {
}
/**
+ * @param pageAddr Page address.
+ * @return Compression type.
+ */
+ public static byte getCompressionType(long pageAddr) {
+ return PageUtils.getByte(pageAddr, COMPRESSION_TYPE_OFF);
+ }
+
+ /**
* @param page Page buffer.
* @param compressedSize Compressed size.
*/
@@ -421,6 +429,14 @@ public abstract class PageIO {
}
/**
+ * @param pageAddr Page address.
+ * @return Compressed size.
+ */
+ public static short getCompressedSize(long pageAddr) {
+ return PageUtils.getShort(pageAddr, COMPRESSED_SIZE_OFF);
+ }
+
+ /**
* @param page Page buffer.
* @param compactedSize Compacted size.
*/
@@ -438,6 +454,14 @@ public abstract class PageIO {
/**
* @param pageAddr Page address.
+ * @return Compacted size.
+ */
+ public static short getCompactedSize(long pageAddr) {
+ return PageUtils.getShort(pageAddr, COMPACTED_SIZE_OFF);
+ }
+
+ /**
+ * @param pageAddr Page address.
* @return Checksum.
*/
public static int getCrc(long pageAddr) {
@@ -839,9 +863,8 @@ public abstract class PageIO {
assert pageSize <= out.remaining();
assert pageSize == page.remaining();
- page.mark();
- out.put(page).flip();
- page.reset();
+ PageHandler.copyMemory(page, 0, out, 0, pageSize);
+ out.limit(pageSize);
}
/**
@@ -857,7 +880,14 @@ public abstract class PageIO {
.a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
.a("\n],\n");
- io.printPage(addr, pageSize, sb);
+ if (getCompressionType(addr) != 0) {
+ sb.a("CompressedPage[\n\tcompressionType=").a(getCompressionType(addr))
+ .a(",\n\tcompressedSize=").a(getCompressedSize(addr))
+ .a(",\n\tcompactedSize=").a(getCompactedSize(addr))
+ .a("\n]");
+ }
+ else
+ io.printPage(addr, pageSize, sb);
return sb.toString();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 0e45c26..7bf2e10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.WalSegmentArchivedEvent;
@@ -103,9 +104,11 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.io.GridFileUtils;
@@ -357,6 +360,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Switch segment record offset. */
private final AtomicLongArray switchSegmentRecordOffset;
+ /** Page snapshot records compression algorithm. */
+ private DiskPageCompression pageCompression;
+
+ /** Page snapshot records compression level. */
+ private int pageCompressionLevel;
+
/**
* @param ctx Kernal context.
*/
@@ -471,6 +480,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentRouter,
ioFactory
);
+
+ pageCompression = dsCfg.getWalPageCompression();
+
+ if (pageCompression != DiskPageCompression.DISABLED) {
+ if (serializerVer < 2) {
+ throw new IgniteCheckedException("WAL page snapshots compression not supported for serializerVer=" +
+ serializerVer);
+ }
+
+ cctx.kernalContext().compress().checkPageCompressionSupported();
+
+ pageCompressionLevel = dsCfg.getWalPageCompressionLevel() != null ?
+ CompressionProcessor.checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), pageCompression) :
+ CompressionProcessor.getDefaultCompressionLevel(pageCompression);
+ }
}
}
@@ -769,6 +793,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (currWrHandle == null || (isDisable != null && isDisable.check()))
return null;
+ // Do page snapshots compression if configured.
+ if (pageCompression != DiskPageCompression.DISABLED && rec instanceof PageSnapshot) {
+ PageSnapshot pageSnapshot = (PageSnapshot)rec;
+
+ int pageSize = pageSnapshot.realPageSize();
+
+ ByteBuffer pageData = pageSnapshot.pageDataBuffer();
+
+ ByteBuffer compressedPage = cctx.kernalContext().compress().compressPage(pageData, pageSize, 1,
+ pageCompression, pageCompressionLevel);
+
+ if (compressedPage != pageData) {
+ assert compressedPage.isDirect() : "Is direct buffer: " + compressedPage.isDirect();
+
+ rec = new PageSnapshot(pageSnapshot.fullPageId(), GridUnsafe.bufferAddress(compressedPage),
+ compressedPage.limit(), pageSize);
+ }
+ }
+
// Need to calculate record size first.
rec.size(serializer.size(rec));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
index e94be9f..e1d7c9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java
@@ -41,11 +41,12 @@ public interface RecordDataSerializer {
*
* @param type Record type.
* @param in Buffer to read.
+ * @param size Record size (0 if unknown).
* @return WAL record.
* @throws IOException In case of I/O problems.
* @throws IgniteCheckedException If it's unable to read record.
*/
- WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException;
+ WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in, int size) throws IOException, IgniteCheckedException;
/**
* Writes record data to buffer {@code buf}.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index c5ede10..dd6393f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -127,10 +127,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
protected final GridCacheSharedContext cctx;
/** Size of page used for PageMemory regions. */
- private final int pageSize;
+ protected final int pageSize;
/** Size of page without encryption overhead. */
- private final int realPageSize;
+ protected final int realPageSize;
/** Cache object processor to reading {@link DataEntry DataEntries}. */
protected final IgniteCacheObjectProcessor co;
@@ -184,7 +184,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
}
/** {@inheritDoc} */
- @Override public WALRecord readRecord(RecordType type, ByteBufferBackedDataInput in)
+ @Override public WALRecord readRecord(RecordType type, ByteBufferBackedDataInput in, int size)
throws IOException, IgniteCheckedException {
if (type == ENCRYPTED_RECORD) {
if (encSpi == null) {
@@ -201,10 +201,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
if (clData.get1() == null)
return new EncryptedRecord(clData.get2(), clData.get3());
- return readPlainRecord(clData.get3(), clData.get1(), true);
+ return readPlainRecord(clData.get3(), clData.get1(), true, clData.get1().buffer().capacity());
}
- return readPlainRecord(type, in, false);
+ return readPlainRecord(type, in, false, size);
}
/** {@inheritDoc} */
@@ -346,7 +346,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
PageSnapshot pageRec = (PageSnapshot)record;
- return pageRec.pageData().length + 12;
+ return pageRec.pageDataSize() + 12;
case CHECKPOINT_RECORD:
CheckpointRecord cpRec = (CheckpointRecord)record;
@@ -532,12 +532,13 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
* @param type Record type.
* @param in Input
* @param encrypted Record was encrypted.
+ * @param recordSize Record size.
* @return Deserialized record.
* @throws IOException If failed.
* @throws IgniteCheckedException If failed.
*/
WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in,
- boolean encrypted) throws IOException, IgniteCheckedException {
+ boolean encrypted, int recordSize) throws IOException, IgniteCheckedException {
WALRecord res;
switch (type) {
@@ -1160,7 +1161,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
buf.putInt(snap.fullPageId().groupId());
buf.putLong(snap.fullPageId().pageId());
- buf.put(snap.pageData());
+ buf.put(snap.pageDataBuffer());
break;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
index 8533e73..52714c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.ConsistentCutRecord;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.pagemem.wal.record.LazyMvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -121,9 +123,20 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
@Override WALRecord readPlainRecord(
RecordType type,
ByteBufferBackedDataInput in,
- boolean encrypted
+ boolean encrypted,
+ int recordSize
) throws IOException, IgniteCheckedException {
switch (type) {
+ case PAGE_RECORD:
+ int cacheId = in.readInt();
+ long pageId = in.readLong();
+
+ byte[] arr = new byte[recordSize - 4 /* cacheId */ - 8 /* pageId */];
+
+ in.readFully(arr);
+
+ return new PageSnapshot(new FullPageId(pageId, cacheId), arr, encrypted ? realPageSize : pageSize);
+
case CHECKPOINT_RECORD:
long msb = in.readLong();
long lsb = in.readLong();
@@ -200,7 +213,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer {
return new ConsistentCutRecord();
default:
- return super.readPlainRecord(type, in, encrypted);
+ return super.readPlainRecord(type, in, encrypted, recordSize);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index 288be5a..ec0ddc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -139,7 +139,7 @@ public class RecordV1Serializer implements RecordSerializer {
if (recType == null)
throw new IOException("Unknown record type: " + recType);
- final WALRecord rec = dataSerializer.readRecord(recType, in);
+ final WALRecord rec = dataSerializer.readRecord(recType, in, 0);
rec.position(ptr);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 872e47d..5a6db67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -159,7 +159,8 @@ public class RecordV2Serializer implements RecordSerializer {
return new MarshalledRecord(recType, ptr, buf);
}
else {
- WALRecord rec = dataSerializer.readRecord(recType, in);
+ WALRecord rec = dataSerializer.readRecord(recType, in, ptr.length() - REC_TYPE_SIZE -
+ FILE_WAL_POINTER_SIZE - CRC_SIZE);
rec.position(ptr);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
index 4f2227e..6c14d6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
@@ -50,7 +50,7 @@ public class CompressionProcessor extends GridProcessorAdapter {
public static final int ZSTD_DEFAULT_LEVEL = 3;
/** */
- protected static final byte UNCOMPRESSED_PAGE = 0;
+ public static final byte UNCOMPRESSED_PAGE = 0;
/** */
protected static final byte COMPACTED_PAGE = 1;
@@ -133,6 +133,15 @@ public class CompressionProcessor extends GridProcessorAdapter {
}
/**
+ * Checks weither page compression is supported.
+ *
+ * @throws IgniteCheckedException If compression is not supported.
+ */
+ public void checkPageCompressionSupported() throws IgniteCheckedException {
+ fail();
+ }
+
+ /**
* @param storagePath Storage path.
* @param pageSize Page size.
* @throws IgniteCheckedException If compression is not supported.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 167571d..2f74ac9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -60,6 +60,7 @@ import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.MemoryPolicyConfiguration;
@@ -1922,6 +1923,11 @@ public class PlatformConfigurationUtils {
if (in.readBoolean())
res.setCheckpointReadLockTimeout(in.readLong());
+ res.setWalPageCompression(DiskPageCompression.fromOrdinal(in.readInt()));
+
+ if (in.readBoolean())
+ res.setWalPageCompressionLevel(in.readInt());
+
int cnt = in.readInt();
if (cnt > 0) {
@@ -2058,6 +2064,15 @@ public class PlatformConfigurationUtils {
else
w.writeBoolean(false);
+ w.writeInt(cfg.getWalPageCompression().ordinal());
+
+ if (cfg.getWalPageCompressionLevel() != null) {
+ w.writeBoolean(true);
+ w.writeInt(cfg.getWalPageCompressionLevel());
+ }
+ else
+ w.writeBoolean(false);
+
if (cfg.getDataRegionConfigurations() != null) {
w.writeInt(cfg.getDataRegionConfigurations().length);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
index f0f1d8a..05ba134 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridStringBuilder;
@@ -27,6 +28,9 @@ import org.apache.ignite.internal.util.GridStringBuilder;
*/
public class DummyPageIO extends PageIO implements CompactablePageIO {
/** */
+ public static final IOVersions<DummyPageIO> VERSIONS = new IOVersions<>(new DummyPageIO());
+
+ /** */
public DummyPageIO() {
super(2 * Short.MAX_VALUE, 1);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index c44cbd0..5921c51 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.FullPageId;
@@ -211,7 +212,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
mem = shared.database().dataRegion(null).pageMemory();
- verifyReads(res.get1(), mem, res.get2(), shared.wal());
+ verifyReads(ig.context(), res.get1(), mem, res.get2(), shared.wal());
}
/**
@@ -679,7 +680,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page);
try {
- DataPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId()));
+ DummyPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId()));
ThreadLocalRandom rnd = ThreadLocalRandom.current();
@@ -700,6 +701,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
* @param mem Memory.
*/
private void verifyReads(
+ GridKernalContext ctx,
Map<FullPageId, Integer> res,
PageMemory mem,
WALPointer start,
@@ -707,6 +709,8 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
) throws Exception {
Map<FullPageId, byte[]> replay = new HashMap<>();
+ ByteBuffer buf = ByteBuffer.allocateDirect(mem.pageSize()).order(ByteOrder.nativeOrder());
+
try (PartitionMetaStateRecordExcludeIterator it = new PartitionMetaStateRecordExcludeIterator(wal.replay(start))) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
@@ -729,7 +733,22 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
else if (rec instanceof PageSnapshot) {
PageSnapshot page = (PageSnapshot)rec;
- replay.put(page.fullPageId(), page.pageData());
+ int realPageSize = mem.realPageSize(page.groupId());
+
+ byte[] pageData = page.pageData();
+
+ if (page.pageDataSize() < realPageSize) {
+ buf.clear();
+ buf.put(pageData).flip();
+
+ ctx.compress().decompressPage(buf, realPageSize);
+
+ pageData = new byte[buf.limit()];
+
+ buf.get(pageData);
+ }
+
+ replay.put(page.fullPageId(), pageData);
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 729f9ac..427e80c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import java.io.File;
import java.io.FilenameFilter;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.ignite.Ignite;
@@ -37,9 +38,11 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -172,9 +175,11 @@ public class WalCompactionTest extends GridCommonAbstractTest {
cache.put(i, val);
}
+ byte[] dummyPage = dummyPage(pageSize);
+
// Spam WAL to move all data records to compressible WAL zone.
for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) {
- ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize],
+ ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), dummyPage,
pageSize));
}
@@ -479,9 +484,11 @@ public class WalCompactionTest extends GridCommonAbstractTest {
cache.put(i, val);
}
+ byte[] dummyPage = dummyPage(pageSize);
+
// Spam WAL to move all data records to compressible WAL zone.
for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) {
- ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize],
+ ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), dummyPage,
pageSize));
}
@@ -564,4 +571,19 @@ public class WalCompactionTest extends GridCommonAbstractTest {
assertFalse(fail);
}
+
+ /**
+ * @param pageSize Page size.
+ */
+ private static byte[] dummyPage(int pageSize) {
+ ByteBuffer pageBuf = ByteBuffer.allocateDirect(pageSize);
+
+ DummyPageIO.VERSIONS.latest().initNewPage(GridUnsafe.bufferAddress(pageBuf), -1, pageSize);
+
+ byte[] pageData = new byte[pageSize];
+
+ pageBuf.get(pageData);
+
+ return pageData;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
index 560d707..b013d24 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
@@ -424,7 +423,7 @@ public class PageMemoryTracker implements IgnitePlugin {
page.lock();
try {
- PageUtils.putBytes(page.address(), 0, snapshot.pageData());
+ GridUnsafe.copyMemory(GridUnsafe.bufferAddress(snapshot.pageDataBuffer()), page.address(), pageSize);
page.changeHistory().clear();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 18df698..e1de9fd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.DiscoverySpiTestListener;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -82,6 +84,8 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
@@ -162,7 +166,9 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
/** */
private long checkpointFrequency = DFLT_CHECKPOINT_FREQ;
- ;
+
+ /** WAL page snapshots records compression method. */
+ protected DiskPageCompression walPageCompression;
/** {@inheritDoc} */
@Override protected boolean isMultiJvm() {
@@ -213,6 +219,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
dbCfg.setWalSegments(walSegments);
+ dbCfg.setWalPageCompression(walPageCompression);
+
dbCfg.setCheckpointFrequency(checkpointFrequency);
cfg.setDataStorageConfiguration(dbCfg);
@@ -1357,6 +1365,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
ByteBuffer buf = ByteBuffer.allocateDirect(pageSize);
+ buf.order(ByteOrder.nativeOrder());
+
// Now check that deltas can be correctly applied.
try (WALIterator it = sharedCtx.wal().replay(ptr)) {
while (it.hasNext()) {
@@ -1367,7 +1377,28 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
if (rec instanceof PageSnapshot) {
PageSnapshot page = (PageSnapshot)rec;
- rolledPages.put(page.fullPageId(), page.pageData());
+ CacheGroupContext gctx = sharedCtx.cache().cacheGroup(page.groupId());
+
+ int realPageSize = gctx == null ? pageSize
+ : gctx.dataRegion().pageMemory().realPageSize(page.groupId());
+
+ byte[] pageData = page.pageData();
+
+ if (pageData.length < realPageSize) {
+ buf.clear();
+ buf.put(pageData);
+ buf.flip();
+
+ sharedCtx.kernalContext().compress().decompressPage(buf, realPageSize);
+
+ pageData = new byte[realPageSize];
+
+ buf.position(0);
+
+ buf.get(pageData);
+ }
+
+ rolledPages.put(page.fullPageId(), pageData);
}
else if (rec instanceof PageDeltaRecord) {
PageDeltaRecord delta = (PageDeltaRecord)rec;
@@ -1384,17 +1415,13 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
assertNotNull("Missing page snapshot [page=" + fullId + ", delta=" + delta + ']', pageData);
- buf.order(ByteOrder.nativeOrder());
-
- buf.position(0);
+ buf.clear();
buf.put(pageData);
- buf.position(0);
+ buf.flip();
delta.applyDelta(sharedCtx.database().dataRegion(null).pageMemory(),
GridUnsafe.bufferAddress(buf));
- buf.position(0);
-
buf.get(pageData);
}
}
@@ -1404,6 +1431,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
PageMemoryEx pageMem = (PageMemoryEx)db.dataRegion(null).pageMemory();
+ ByteBuffer bufWal = ByteBuffer.allocateDirect(pageSize);
+
for (Map.Entry<FullPageId, byte[]> entry : rolledPages.entrySet()) {
FullPageId fullId = entry.getKey();
@@ -1419,12 +1448,30 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
try {
byte[] data = entry.getValue();
- for (int i = 0; i < data.length; i++) {
- if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize()))
- continue; // Skip tracking pages.
+ if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize()))
+ continue; // Skip tracking pages.
- assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]);
+ // Compaction/restoring page can left some trash in unused space, so we need to compare
+ // compacted pages in case of compaction is used.
+ if (walPageCompression != null && PageIO.getPageIO(bufPtr) instanceof CompactablePageIO) {
+ CompactablePageIO pageIO = PageIO.getPageIO(bufPtr);
+
+ buf.clear();
+ bufWal.clear();
+
+ int realPageSize = data.length;
+
+ pageIO.compactPage(GridUnsafe.wrapPointer(bufPtr, realPageSize), buf, realPageSize);
+ pageIO.compactPage(ByteBuffer.wrap(data), bufWal, realPageSize);
+
+ bufPtr = GridUnsafe.bufferAddress(buf);
+ data = new byte[bufWal.limit()];
+ bufWal.rewind();
+ bufWal.get(data);
}
+
+ for (int i = 0; i < data.length; i++)
+ assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]);
}
finally {
pageMem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, false, true);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
index 6c6d69e..bf6715f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
@@ -149,7 +149,8 @@
walThreadLocalBufferSize="11"
walArchivePath="abc" walFlushFrequency="00:00:12" walFsyncDelayNanos="13" walHistorySize="14"
walMode="Background" walRecordIteratorBufferSize="15" walSegments="16" walSegmentSize="17"
- walPath="wal-store" writeThrottlingEnabled="true" walAutoArchiveAfterInactivity="00:00:18">
+ walPath="wal-store" writeThrottlingEnabled="true" walAutoArchiveAfterInactivity="00:00:18"
+ walPageCompression="Zstd">
<dataRegionConfigurations>
<dataRegionConfiguration emptyPagesPoolSize="1" evictionThreshold="2" initialSize="3" metricsEnabled="true"
maxSize="4" name="reg2" pageEvictionMode="RandomLru" metricsRateTimeInterval="00:00:01"
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index 0135e80..4961954 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -323,6 +323,7 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual("wal-store", ds.WalPath);
Assert.AreEqual(TimeSpan.FromSeconds(18), ds.WalAutoArchiveAfterInactivity);
Assert.IsTrue(ds.WriteThrottlingEnabled);
+ Assert.AreEqual(DiskPageCompression.Zstd, ds.WalPageCompression);
var dr = ds.DataRegionConfigurations.Single();
Assert.AreEqual(1, dr.EmptyPagesPoolSize);
@@ -986,6 +987,8 @@ namespace Apache.Ignite.Core.Tests
ConcurrencyLevel = 1,
PageSize = 5 * 1024,
WalAutoArchiveAfterInactivity = TimeSpan.FromSeconds(19),
+ WalPageCompression = DiskPageCompression.Lz4,
+ WalPageCompressionLevel = 10,
DefaultDataRegionConfiguration = new DataRegionConfiguration
{
Name = "reg1",
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index ff96dc4..b99f2a8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -63,6 +63,7 @@
<Compile Include="Common\IgniteProductVersion.cs" />
<Compile Include="Configuration\CheckpointWriteOrder.cs" />
<Compile Include="Configuration\DataPageEvictionMode.cs" />
+ <Compile Include="Configuration\DiskPageCompression.cs" />
<Compile Include="Configuration\DataRegionConfiguration.cs" />
<Compile Include="Configuration\DataStorageConfiguration.cs" />
<Compile Include="Cache\Configuration\MemoryPolicyConfiguration.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
index d5cfe85..2ca4108 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs
@@ -169,6 +169,11 @@ namespace Apache.Ignite.Core.Configuration
public const long DefaultMaxWalArchiveSize = 1024 * 1024 * 1024;
/// <summary>
+ /// Default value for <see cref="WalPageCompression"/>.
+ /// </summary>
+ public const DiskPageCompression DefaultWalPageCompression = DiskPageCompression.Disabled;
+
+ /// <summary>
/// Initializes a new instance of the <see cref="DataStorageConfiguration"/> class.
/// </summary>
public DataStorageConfiguration()
@@ -196,6 +201,7 @@ namespace Apache.Ignite.Core.Configuration
PageSize = DefaultPageSize;
WalAutoArchiveAfterInactivity = DefaultWalAutoArchiveAfterInactivity;
MaxWalArchiveSize = DefaultMaxWalArchiveSize;
+ WalPageCompression = DefaultWalPageCompression;
}
/// <summary>
@@ -236,6 +242,8 @@ namespace Apache.Ignite.Core.Configuration
ConcurrencyLevel = reader.ReadInt();
WalAutoArchiveAfterInactivity = reader.ReadLongAsTimespan();
CheckpointReadLockTimeout = reader.ReadTimeSpanNullable();
+ WalPageCompression = (DiskPageCompression)reader.ReadInt();
+ WalPageCompressionLevel = reader.ReadIntNullable();
var count = reader.ReadInt();
@@ -290,6 +298,8 @@ namespace Apache.Ignite.Core.Configuration
writer.WriteInt(ConcurrencyLevel);
writer.WriteTimeSpanAsLong(WalAutoArchiveAfterInactivity);
writer.WriteTimeSpanAsLongNullable(CheckpointReadLockTimeout);
+ writer.WriteInt((int)WalPageCompression);
+ writer.WriteIntNullable(WalPageCompressionLevel);
if (DataRegionConfigurations != null)
{
@@ -498,6 +508,16 @@ namespace Apache.Ignite.Core.Configuration
public TimeSpan? CheckpointReadLockTimeout { get; set; }
/// <summary>
+ /// Gets or sets the compression algorithm for WAL page snapshot records.
+ /// </summary>
+ public DiskPageCompression WalPageCompression { get; set; }
+
+ /// <summary>
+ /// Gets or sets the compression level for WAL page snapshot records.
+ /// </summary>
+ public int? WalPageCompressionLevel { get; set; }
+
+ /// <summary>
/// Gets or sets the data region configurations.
/// </summary>
[SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly")]
@@ -508,4 +528,4 @@ namespace Apache.Ignite.Core.Configuration
/// </summary>
public DataRegionConfiguration DefaultDataRegionConfiguration { get; set; }
}
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs
new file mode 100644
index 0000000..133ecd1
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Configuration
+{
+ /// <summary>
+ /// Disk page compression options.
+ /// </summary>
+ public enum DiskPageCompression
+ {
+ /// <summary>
+ /// Compression disabled.
+ /// </summary>
+ Disabled,
+
+ /// <summary>
+ /// Retain only useful data from half-filled pages, but do not apply any compression.
+ /// </summary>
+ SkipGarbage,
+
+ /// <summary>
+ /// Zstd compression.
+ /// </summary>
+ Zstd,
+
+ /// <summary>
+ /// Lz4 compression.
+ /// </summary>
+ Lz4,
+
+ /// <summary>
+ /// Snappy compression.
+ /// </summary>
+ Snappy
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index c23cf90..9179048 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -117,6 +117,16 @@
</xs:restriction>
</xs:simpleType>
+ <xs:simpleType name="diskPageCompression" final="restriction">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="Disabled" />
+ <xs:enumeration value="SkipGarbage" />
+ <xs:enumeration value="Zstd" />
+ <xs:enumeration value="Lz4" />
+ <xs:enumeration value="Snappy" />
+ </xs:restriction>
+ </xs:simpleType>
+
<xs:element name="igniteConfiguration">
<xs:annotation>
<xs:documentation>Ignite configuration root.</xs:documentation>
@@ -1948,6 +1958,16 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="walPageCompression" type="diskPageCompression">
+ <xs:annotation>
+ <xs:documentation>Compression algorithm for WAL page snapshot records.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ <xs:attribute name="walPageCompressionLevel" type="xs:int">
+ <xs:annotation>
+ <xs:documentation>Compression level for WAL page snapshot records.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
</xs:complexType>
</xs:element>
<xs:element name="sslContextFactory" minOccurs="0">