You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/28 11:52:47 UTC
[16/50] [abbrv] ignite git commit: IGNITE-10330: Disk page
compression. - Fixes #5200.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java
new file mode 100644
index 0000000..70dda0b
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.io.FileDescriptor;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import junit.framework.TestCase;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessorImpl.allocateDirectBuffer;
+import static org.apache.ignite.internal.processors.compress.FileSystemUtils.getFileSystemBlockSize;
+import static org.apache.ignite.internal.processors.compress.FileSystemUtils.getSparseFileSize;
+import static org.apache.ignite.internal.processors.compress.FileSystemUtils.punchHole;
+
+/**
+ */
+public class FileSystemUtilsTest extends TestCase {
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSparseFiles() throws Exception {
+ if (!U.isLinux())
+ return;
+
+ Path file = Files.createTempFile("test_sparse_file_", ".bin");
+
+ try {
+ doTestSparseFiles(file, false); // Ext4 expected as default FS.
+ }
+ finally {
+ Files.delete(file);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testFileSystems() throws Exception {
+ doTestSparseFiles(Paths.get("/ext4/test_file"), false);
+ doTestSparseFiles(Paths.get("/btrfs/test_file"), false);
+ doTestSparseFiles(Paths.get("/xfs/test_file"), true);
+ }
+
+ private static int getFD(FileChannel ch) throws IgniteCheckedException {
+ return U.<Integer>field(U.<FileDescriptor>field(ch, "fd"), "fd");
+ }
+
+ /**
+ * @param file File path.
+ * @param reopen Reopen file after each hole punch. XFS needs it.
+ * @throws Exception If failed.
+ */
+ private void doTestSparseFiles(Path file, boolean reopen) throws Exception {
+ System.out.println(file);
+
+ FileChannel ch = FileChannel.open(file,
+ READ, WRITE, TRUNCATE_EXISTING);
+
+ try {
+ int fd = getFD(ch);
+
+ int fsBlockSize = getFileSystemBlockSize(fd);
+
+ System.out.println("fsBlockSize: " + fsBlockSize);
+
+ assertTrue(fsBlockSize > 0);
+
+ int pageSize = fsBlockSize * 4;
+
+ ByteBuffer page = allocateDirectBuffer(pageSize);
+
+ while (page.remaining() > 0)
+ page.putLong(0xABCDEF7654321EADL);
+ page.flip();
+
+ int pages = 5;
+ int blocks = pages * pageSize / fsBlockSize;
+ int fileSize = pages * pageSize;
+ int sparseSize = fileSize;
+
+ for (int i = 0; i < pages; i++) {
+ ch.write(page, i * pageSize);
+ assertEquals(0, page.remaining());
+ page.flip();
+ }
+
+ if (reopen) {
+ ch.force(true);
+ ch.close();
+ ch = FileChannel.open(file, READ, WRITE);
+ fd = getFD(ch);
+ }
+
+ assertEquals(fileSize, ch.size());
+ assertEquals(fileSize, getSparseFileSize(fd));
+
+ int off = fsBlockSize * 3 - (fsBlockSize >>> 2);
+ int len = fsBlockSize;
+ assertEquals(0, punchHole(fd, off, len, fsBlockSize));
+ if (reopen) {
+ ch.force(true);
+ ch.close();
+ ch = FileChannel.open(file, READ, WRITE);
+ fd = getFD(ch);
+ }
+ assertEquals(fileSize, getSparseFileSize(fd));
+
+ off = 2 * fsBlockSize - 3;
+ len = 2 * fsBlockSize + 3;
+ assertEquals(2 * fsBlockSize, punchHole(fd, off, len, fsBlockSize));
+ if (reopen) {
+ ch.force(true);
+ ch.close();
+ ch = FileChannel.open(file, READ, WRITE);
+ fd = getFD(ch);
+ }
+ assertEquals(sparseSize -= 2 * fsBlockSize, getSparseFileSize(fd));
+
+ off = 10 * fsBlockSize;
+ len = 3 * fsBlockSize + 5;
+ assertEquals(3 * fsBlockSize, punchHole(fd, off, len, fsBlockSize));
+ if (reopen) {
+ ch.force(true);
+ ch.close();
+ ch = FileChannel.open(file, READ, WRITE);
+ fd = getFD(ch);
+ }
+ assertEquals(sparseSize -= 3 * fsBlockSize, getSparseFileSize(fd));
+
+ off = 15 * fsBlockSize + 1;
+ len = fsBlockSize;
+ assertEquals(0, punchHole(fd, off, len, fsBlockSize));
+
+ off = 15 * fsBlockSize - 1;
+ len = fsBlockSize;
+ assertEquals(0, punchHole(fd, off, len, fsBlockSize));
+
+ off = 15 * fsBlockSize;
+ len = fsBlockSize - 1;
+ assertEquals(0, punchHole(fd, off, len, fsBlockSize));
+
+ off = 15 * fsBlockSize;
+ len = fsBlockSize;
+ assertEquals(fsBlockSize, punchHole(fd, off, len, fsBlockSize));
+ if (reopen) {
+ ch.force(true);
+ ch.close();
+ ch = FileChannel.open(file, READ, WRITE);
+ fd = getFD(ch);
+ }
+ assertEquals(sparseSize -= fsBlockSize, getSparseFileSize(fd));
+
+ for (int i = 0; i < blocks - 1; i++)
+ punchHole(fd, fsBlockSize * i, fsBlockSize, fsBlockSize);
+
+ if (reopen) {
+ ch.force(true);
+ ch.close();
+ ch = FileChannel.open(file, READ, WRITE);
+ fd = getFD(ch);
+ }
+
+ assertEquals(fsBlockSize, getSparseFileSize(fd));
+ }
+ finally {
+ ch.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
new file mode 100644
index 0000000..a977700
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.compress.CompressionProcessorTest;
+import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationAsyncTest;
+import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationTest;
+import org.apache.ignite.internal.processors.compress.FileSystemUtilsTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION;
+import static org.apache.ignite.configuration.DiskPageCompression.ZSTD;
+
+/**
+ */
+public class IgnitePdsCompressionTestSuite {
+ /**
+ * @return Suite.
+ */
+ public static TestSuite suite() {
+ TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite (with page compression).");
+
+ suite.addTestSuite(CompressionProcessorTest.class);
+ suite.addTestSuite(FileSystemUtilsTest.class);
+ suite.addTestSuite(DiskPageCompressionIntegrationTest.class);
+ suite.addTestSuite(DiskPageCompressionIntegrationAsyncTest.class);
+
+ enableCompressionByDefault();
+ IgnitePdsTestSuite.addRealPageStoreTests(suite);
+
+ return suite;
+ }
+
+ /**
+ */
+ static void enableCompressionByDefault() {
+ System.setProperty(IGNITE_DEFAULT_DISK_PAGE_COMPRESSION, ZSTD.name());
+ System.setProperty(IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, String.valueOf(8 * 1024));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java
new file mode 100644
index 0000000..3fb8ac2
--- /dev/null
+++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+
+import static org.apache.ignite.testsuites.IgnitePdsCompressionTestSuite.enableCompressionByDefault;
+
+/**
+ */
+public class IgnitePdsCompressionTestSuite2 {
+ /**
+ * @return Suite.
+ */
+ public static TestSuite suite() {
+ TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite 2 (with page compression).");
+
+ enableCompressionByDefault();
+ IgnitePdsTestSuite2.addRealPageStoreTests(suite);
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
index cdde0ac..e23d188 100644
--- a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java
@@ -212,4 +212,22 @@ public interface DataStorageMetrics {
* @return Checkpoint buffer size in bytes.
*/
public long getCheckpointBufferSize();
+
+ /**
+ * Storage space allocated in bytes.
+ *
+ * @return Storage space allocated in bytes.
+ */
+ public long getStorageSize();
+
+ /**
+ * Storage space allocated adjusted for possible sparsity in bytes.
+ *
+ * May produce unstable or even incorrect result on some file systems (e.g. XFS).
+ * Known to work correctly on Ext4 and Btrfs.
+ *
+ * @return Storage space allocated adjusted for possible sparsity in bytes
+ * or negative value is not supported.
+ */
+ public long getSparseStorageSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index ccf7ebf..2d27840 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1047,6 +1047,16 @@ public final class IgniteSystemProperties {
public static final String IGNITE_RECOVERY_VERBOSE_LOGGING = "IGNITE_RECOVERY_VERBOSE_LOGGING";
/**
+ * Sets default {@link CacheConfiguration#setDiskPageCompression disk page compression}.
+ */
+ public static final String IGNITE_DEFAULT_DISK_PAGE_COMPRESSION = "IGNITE_DEFAULT_DISK_PAGE_COMPRESSION";
+
+ /**
+ * Sets default {@link DataStorageConfiguration#setPageSize storage page size}.
+ */
+ public static final String IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE = "IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
@@ -1054,6 +1064,40 @@ public final class IgniteSystemProperties {
}
/**
+ * @param enumCls Enum type.
+ * @param name Name of the system property or environment variable.
+ * @return Enum value or {@code null} if the property is not set.
+ */
+ public static <E extends Enum<E>> E getEnum(Class<E> enumCls, String name) {
+ return getEnum(enumCls, name, null);
+ }
+
+ /**
+ * @param name Name of the system property or environment variable.
+ * @return Enum value or the given default.
+ */
+ public static <E extends Enum<E>> E getEnum(String name, E dflt) {
+ return getEnum(dflt.getDeclaringClass(), name, dflt);
+ }
+
+ /**
+ * @param enumCls Enum type.
+ * @param name Name of the system property or environment variable.
+ * @param dflt Default value.
+ * @return Enum value or the given default.
+ */
+ private static <E extends Enum<E>> E getEnum(Class<E> enumCls, String name, E dflt) {
+ assert enumCls != null;
+
+ String val = getString(name);
+
+ if (val == null)
+ return dflt;
+
+ return Enum.valueOf(enumCls, val);
+ }
+
+ /**
* Gets either system property or environment variable with given name.
*
* @param name Name of the system property or environment variable.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 5c91dc0..e27961d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -33,6 +33,7 @@ import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheWriter;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheInterceptor;
@@ -62,6 +63,8 @@ import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION;
+
/**
* This class defines grid cache configuration. This configuration is passed to
* grid via {@link IgniteConfiguration#getCacheConfiguration()} method. It defines all configuration
@@ -383,6 +386,13 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
*/
private boolean encryptionEnabled;
+ /** */
+ private DiskPageCompression diskPageCompression = IgniteSystemProperties.getEnum(
+ DiskPageCompression.class, IGNITE_DEFAULT_DISK_PAGE_COMPRESSION);
+
+ /** */
+ private Integer diskPageCompressionLevel;
+
/** Empty constructor (all values are initialized to their defaults). */
public CacheConfiguration() {
/* No-op. */
@@ -443,6 +453,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
nearCfg = cc.getNearConfiguration();
nodeFilter = cc.getNodeFilter();
onheapCache = cc.isOnheapCacheEnabled();
+ diskPageCompression = cc.getDiskPageCompression();
+ diskPageCompressionLevel = cc.getDiskPageCompressionLevel();
partLossPlc = cc.getPartitionLossPolicy();
pluginCfgs = cc.getPluginConfigurations();
qryDetailMetricsSz = cc.getQueryDetailMetricsSize();
@@ -2297,6 +2309,54 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
+ /**
+ * Gets disk page compression algorithm.
+ * Makes sense only with enabled {@link DataRegionConfiguration#setPersistenceEnabled persistence}.
+ *
+ * @return Disk page compression algorithm.
+ * @see #getDiskPageCompressionLevel
+ */
+ public DiskPageCompression getDiskPageCompression() {
+ return diskPageCompression;
+ }
+
+ /**
+ * Sets disk page compression algorithm.
+ * Makes sense only with enabled {@link DataRegionConfiguration#setPersistenceEnabled persistence}.
+ *
+ * @param diskPageCompression Disk page compression algorithm.
+ * @return {@code this} for chaining.
+ * @see #setDiskPageCompressionLevel
+ */
+ public CacheConfiguration<K,V> setDiskPageCompression(DiskPageCompression diskPageCompression) {
+ this.diskPageCompression = diskPageCompression;
+
+ return this;
+ }
+
+ /**
+ * Gets {@link #getDiskPageCompression algorithm} specific disk page compression level.
+ *
+ * @return Disk page compression level or {@code null} for default.
+ */
+ public Integer getDiskPageCompressionLevel() {
+ return diskPageCompressionLevel;
+ }
+
+ /**
+ * Sets {@link #setDiskPageCompression algorithm} specific disk page compression level.
+ *
+ * @param diskPageCompressionLevel Disk page compression level or {@code null} to use default.
+ * {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}).
+ * {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}).
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K,V> setDiskPageCompressionLevel(Integer diskPageCompressionLevel) {
+ this.diskPageCompressionLevel = diskPageCompressionLevel;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 20b314f..4aca0b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE;
+
/**
* A durable memory configuration for an Apache Ignite node. The durable memory is a manageable off-heap based memory
* architecture that divides all expandable data regions into pages of fixed size
@@ -87,6 +89,12 @@ public class DataStorageConfiguration implements Serializable {
/** Default memory page size. */
public static final int DFLT_PAGE_SIZE = 4 * 1024;
+ /** Max memory page size. */
+ public static final int MAX_PAGE_SIZE = 16 * 1024;
+
+ /** Min memory page size. */
+ public static final int MIN_PAGE_SIZE = 1024;
+
/** This name is assigned to default Dataregion if no user-defined default MemPlc is specified */
public static final String DFLT_DATA_REG_DEFAULT_NAME = "default";
@@ -166,7 +174,8 @@ public class DataStorageConfiguration implements Serializable {
private long sysRegionMaxSize = DFLT_SYS_REG_MAX_SIZE;
/** Memory page size. */
- private int pageSize;
+ private int pageSize = IgniteSystemProperties.getInteger(
+ IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, 0);
/** Concurrency level. */
private int concLvl;
@@ -346,10 +355,13 @@ public class DataStorageConfiguration implements Serializable {
* Changes the page size.
*
* @param pageSize Page size in bytes. If value is not set (or zero), {@link #DFLT_PAGE_SIZE} will be used.
+ * @see #MIN_PAGE_SIZE
+ * @see #MAX_PAGE_SIZE
*/
public DataStorageConfiguration setPageSize(int pageSize) {
if (pageSize != 0) {
- A.ensure(pageSize >= 1024 && pageSize <= 16 * 1024, "Page size must be between 1kB and 16kB.");
+ A.ensure(pageSize >= MIN_PAGE_SIZE && pageSize <= MAX_PAGE_SIZE,
+ "Page size must be between 1kB and 16kB.");
A.ensure(U.isPow2(pageSize), "Page size must be a power of 2.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
new file mode 100644
index 0000000..d628c6a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+/**
+ * Disk page compression options.
+ *
+ * @see CacheConfiguration#setDiskPageCompression
+ * @see CacheConfiguration#setDiskPageCompressionLevel
+ */
+public enum DiskPageCompression {
+ /** Retain only useful data from half-filled pages, but do not apply any compression. */
+ SKIP_GARBAGE,
+
+ /** Zstd compression. */
+ ZSTD,
+
+ /** LZ4 compression. */
+ LZ4,
+
+ /** Snappy compression. */
+ SNAPPY
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index a43312c..e19450e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.managers.failover.GridFailoverManager;
import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -700,6 +701,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public Thread.UncaughtExceptionHandler uncaughtExceptionHandler();
/**
+ * @return Compression processor.
+ */
+ public CompressionProcessor compress();
+
+ /**
* @return {@code True} if node is in recovery mode (before join to topology).
*/
public boolean recoveryMode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 8a42664..ef69167 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.failover.GridFailoverManager;
import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
@@ -294,6 +295,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ private CompressionProcessor compressProc;
+
+ /** */
+ @GridToStringExclude
private DataStructuresProcessor dataStructuresProc;
/** Cache mvcc coordinators. */
@@ -639,6 +644,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
internalSubscriptionProc = (GridInternalSubscriptionProcessor)comp;
else if (comp instanceof IgniteAuthenticationProcessor)
authProc = (IgniteAuthenticationProcessor)comp;
+ else if (comp instanceof CompressionProcessor)
+ compressProc = (CompressionProcessor)comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -1184,6 +1191,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public CompressionProcessor compress() {
+ return compressProc;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean recoveryMode() {
return recoveryMode;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
index 0cd2fc1..65cbb90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
import java.lang.reflect.Constructor;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
@@ -89,6 +90,12 @@ public enum IgniteComponentType {
"org.apache.ignite.internal.processors.schedule.IgniteNoopScheduleProcessor",
"org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessor",
"ignite-schedule"
+ ),
+
+ COMPRESSION(
+ CompressionProcessor.class.getName(),
+ "org.apache.ignite.internal.processors.compress.CompressionProcessorImpl",
+ "ignite-compress"
);
/** No-op class name. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 710fd09..284a4cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -236,6 +236,7 @@ import static org.apache.ignite.internal.GridKernalState.STARTED;
import static org.apache.ignite.internal.GridKernalState.STARTING;
import static org.apache.ignite.internal.GridKernalState.STOPPED;
import static org.apache.ignite.internal.GridKernalState.STOPPING;
+import static org.apache.ignite.internal.IgniteComponentType.COMPRESSION;
import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER;
import static org.apache.ignite.internal.IgniteComponentType.IGFS;
import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER;
@@ -1002,6 +1003,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
// Start processors before discovery manager, so they will
// be able to start receiving messages once discovery completes.
try {
+ startProcessor(COMPRESSION.createOptional(ctx));
startProcessor(new PdsConsistentIdProcessor(ctx));
startProcessor(new MvccProcessorImpl(ctx));
startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
index 7a7f964..7c1e15d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
@@ -17,9 +17,8 @@
package org.apache.ignite.internal.pagemem.store;
-import org.apache.ignite.IgniteCheckedException;
-
import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
/**
@@ -128,4 +127,35 @@ public interface PageStore {
* @throws StorageException If failed.
*/
public void truncate(int tag) throws StorageException;
+
+ /**
+ * @return Page size in bytes.
+ */
+ public int getPageSize();
+
+ /**
+ * @return Storage block size or negative value if unknown or not supported.
+ */
+ public int getBlockSize();
+
+ /**
+ * @return Size of the storage in bytes. May differ from {@link #pages()} * {@link #getPageSize()}
+ * due to delayed writes or due to other implementation specific details.
+ */
+ public long size();
+
+ /**
+ * @return Size of the storage adjusted for sparsity in bytes or negative
+ * value if not supported. Should be less than or equal to {@link #size()}.
+ * @see #punchHole
+ */
+ public long getSparseSize();
+
+ /**
+ * Should free all the extra storage space after the given number of useful bytes in the given page.
+ *
+ * @param pageId Page id.
+ * @param usefulBytes Number of meaningful bytes from the beginning of the page.
+ */
+ void punchHole(long pageId, int usefulBytes);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
new file mode 100644
index 0000000..d9e977a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds;
+import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel;
+
+/**
+ * Cache compression manager.
+ */
+public class CacheCompressionManager extends GridCacheManagerAdapter {
+ /** */
+ private DiskPageCompression diskPageCompression;
+
+ /** */
+ private int diskPageCompressLevel;
+
+ /** */
+ private CompressionProcessor compressProc;
+
+ /** {@inheritDoc} */
+ @Override protected void start0() throws IgniteCheckedException {
+ compressProc = cctx.kernalContext().compress();
+
+ CacheConfiguration cfg = cctx.config();
+
+ diskPageCompression = cfg.getDiskPageCompression();
+
+ if (diskPageCompression != null) {
+ if (!cctx.dataRegion().config().isPersistenceEnabled())
+ throw new IgniteCheckedException("Disk page compression makes sense only with enabled persistence.");
+
+ Integer lvl = cfg.getDiskPageCompressionLevel();
+ diskPageCompressLevel = lvl != null ?
+ checkCompressionLevelBounds(lvl, diskPageCompression) :
+ getDefaultCompressionLevel(diskPageCompression);
+
+ DataStorageConfiguration dsCfg = cctx.kernalContext().config().getDataStorageConfiguration();
+
+ File dbPath = cctx.kernalContext().pdsFolderResolver().resolveFolders().persistentStoreRootPath();
+
+ assert dbPath != null;
+
+ compressProc.checkPageCompressionSupported(dbPath.toPath(), dsCfg.getPageSize());
+
+ if (log.isInfoEnabled()) {
+ log.info("Disk page compression is enabled [cache=" + cctx.name() +
+ ", compression=" + diskPageCompression + ", level=" + diskPageCompressLevel + "]");
+ }
+ }
+ }
+
+ /**
+ * @param page Page buffer.
+ * @param store Page store.
+ * @return Compressed or the same buffer.
+ * @throws IgniteCheckedException If failed.
+ */
+ public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException {
+ if (diskPageCompression == null)
+ return page;
+
+ int blockSize = store.getBlockSize();
+
+ if (blockSize <= 0)
+ throw new IgniteCheckedException("Failed to detect storage block size on " + U.osString());
+
+ return compressProc.compressPage(page, store.getPageSize(), blockSize, diskPageCompression, diskPageCompressLevel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
index 5ece77f..59894e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
@@ -28,15 +28,17 @@ import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
/**
@@ -359,4 +361,21 @@ public class CacheGroupMetricsMXBeanImpl implements CacheGroupMetricsMXBean {
@Override public long getTotalAllocatedSize() {
return getTotalAllocatedPages() * ctx.dataRegion().pageMemory().pageSize();
}
+
+ /** {@inheritDoc} */
+ @Override public long getStorageSize() {
+ return database().forGroupPageStores(ctx, PageStore::size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseStorageSize() {
+ return database().forGroupPageStores(ctx, PageStore::getSparseSize);
+ }
+
+ /**
+ * @return Database.
+ */
+ private GridCacheDatabaseSharedManager database() {
+ return (GridCacheDatabaseSharedManager)ctx.shared().database();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 30cf969..1a8cf88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -177,6 +177,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Store manager. */
private CacheStoreManager storeMgr;
+ /** Compression manager. */
+ private CacheCompressionManager compressMgr;
+
/** Replication manager. */
private GridCacheDrManager drMgr;
@@ -321,6 +324,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* ===========================
*/
+ CacheCompressionManager compressMgr,
GridCacheEventManager evtMgr,
CacheStoreManager storeMgr,
CacheEvictionManager evictMgr,
@@ -338,6 +342,7 @@ public class GridCacheContext<K, V> implements Externalizable {
assert cacheCfg != null;
assert locStartTopVer != null : cacheCfg.getName();
+ assert compressMgr != null;
assert grp != null;
assert evtMgr != null;
assert storeMgr != null;
@@ -364,6 +369,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* Managers in starting order!
* ===========================
*/
+ this.compressMgr = add(compressMgr);
this.evtMgr = add(evtMgr);
this.storeMgr = add(storeMgr);
this.evictMgr = add(evictMgr);
@@ -1230,6 +1236,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return Compression manager.
+ */
+ public CacheCompressionManager compress() {
+ return compressMgr;
+ }
+
+ /**
* Sets cache object context.
*
* @param cacheObjCtx Cache object context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ce81468..8a54852 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1524,6 +1524,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg);
+ CacheCompressionManager compressMgr = new CacheCompressionManager();
GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
GridCacheEventManager evtMgr = new GridCacheEventManager();
CacheEvictionManager evictMgr = (nearEnabled || cfg.isOnheapCacheEnabled()) ? new GridCacheEvictionManager() : new CacheOffheapEvictionManager();
@@ -1558,6 +1559,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Managers in starting order!
* ===========================
*/
+ compressMgr,
evtMgr,
storeMgr,
evictMgr,
@@ -1694,6 +1696,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Managers in starting order!
* ===========================
*/
+ compressMgr,
evtMgr,
storeMgr,
evictMgr,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
index 03955a4..4565b58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java
@@ -93,6 +93,12 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
/** */
private volatile Collection<DataRegionMetrics> regionMetrics;
+ /** */
+ private volatile long storageSize;
+
+ /** */
+ private volatile long sparseStorageSize;
+
/**
* @param metricsEnabled Metrics enabled flag.
* @param rateTimeInterval Rate time interval.
@@ -485,6 +491,16 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
return metricsEnabled;
}
+ /** {@inheritDoc} */
+ @Override public long getStorageSize() {
+ return storageSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseStorageSize() {
+ return sparseStorageSize;
+ }
+
/**
* @param lockWaitDuration Lock wait duration.
* @param markDuration Mark duration.
@@ -503,7 +519,9 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
long duration,
long totalPages,
long dataPages,
- long cowPages
+ long cowPages,
+ long storageSize,
+ long sparseStorageSize
) {
if (metricsEnabled) {
lastCpLockWaitDuration = lockWaitDuration;
@@ -514,6 +532,8 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean {
lastCpTotalPages = totalPages;
lastCpDataPages = dataPages;
lastCpCowPages = cowPages;
+ this.storageSize = storageSize;
+ this.sparseStorageSize = sparseStorageSize;
totalCheckpointTime.addAndGet(duration);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
index c3bcd5b..78b08bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java
@@ -101,6 +101,12 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
/** */
private long totalAllocatedSize;
+ /** */
+ private long storageSize;
+
+ /** */
+ private long sparseStorageSize;
+
/**
* @param metrics Metrics.
*/
@@ -131,6 +137,8 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
offHeapSize = metrics.getOffHeapSize();
offHeadUsedSize = metrics.getOffheapUsedSize();
totalAllocatedSize = metrics.getTotalAllocatedSize();
+ storageSize = metrics.getStorageSize();
+ sparseStorageSize = metrics.getSparseStorageSize();
}
/** {@inheritDoc} */
@@ -264,6 +272,16 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics {
}
/** {@inheritDoc} */
+ @Override public long getStorageSize() {
+ return storageSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseStorageSize() {
+ return sparseStorageSize;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageMetricsSnapshot.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index ed54f65..9a083f8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
+import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -398,6 +399,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
: ctx.config().getFailureDetectionTimeout()));
}
+ /**
+ * @return File store manager.
+ */
+ public FilePageStoreManager getFileStoreManager() {
+ return storeMgr;
+ }
+
/** */
private void notifyMetastorageReadyForRead() throws IgniteCheckedException {
for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs)
@@ -1935,6 +1943,44 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * @param f Consumer.
+ * @return Accumulated result for all page stores.
+ */
+ public long forAllPageStores(ToLongFunction<PageStore> f) {
+ long res = 0;
+
+ for (CacheGroupContext gctx : cctx.cache().cacheGroups())
+ res += forGroupPageStores(gctx, f);
+
+ return res;
+ }
+
+ /**
+ * @param gctx Group context.
+ * @param f Consumer.
+ * @return Accumulated result for all page stores.
+ */
+ public long forGroupPageStores(CacheGroupContext gctx, ToLongFunction<PageStore> f) {
+ int groupId = gctx.groupId();
+
+ long res = 0;
+
+ try {
+ Collection<PageStore> stores = storeMgr.getStores(groupId);
+
+ if (stores != null) {
+ for (PageStore store : stores)
+ res += f.applyAsLong(store);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ return res;
+ }
+
+ /**
* Calculates tail pointer for WAL at the end of logical recovery.
*
* @param from Start replay WAL from.
@@ -3263,28 +3309,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
tracker.totalDuration()));
}
}
-
- persStoreMetrics.onCheckpoint(
- tracker.lockWaitDuration(),
- tracker.markDuration(),
- tracker.pagesWriteDuration(),
- tracker.fsyncDuration(),
- tracker.totalDuration(),
- chp.pagesSize,
- tracker.dataPagesWritten(),
- tracker.cowPagesWritten());
- }
- else {
- persStoreMetrics.onCheckpoint(
- tracker.lockWaitDuration(),
- tracker.markDuration(),
- tracker.pagesWriteDuration(),
- tracker.fsyncDuration(),
- tracker.totalDuration(),
- chp.pagesSize,
- tracker.dataPagesWritten(),
- tracker.cowPagesWritten());
}
+
+ updateMetrics(chp, tracker);
}
catch (IgniteCheckedException e) {
if (chp != null)
@@ -3294,6 +3321,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
+ /**
+ * @param chp Checkpoint.
+ * @param tracker Tracker.
+ */
+ private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) {
+ if (persStoreMetrics.metricsEnabled()) {
+ persStoreMetrics.onCheckpoint(
+ tracker.lockWaitDuration(),
+ tracker.markDuration(),
+ tracker.pagesWriteDuration(),
+ tracker.fsyncDuration(),
+ tracker.totalDuration(),
+ chp.pagesSize,
+ tracker.dataPagesWritten(),
+ tracker.cowPagesWritten(),
+ forAllPageStores(PageStore::size),
+ forAllPageStores(PageStore::getSparseSize));
+ }
+ }
+
/** */
private String prepareWalSegsCoveredMsg(IgniteBiTuple<Long, Long> walRange) {
String res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java
index d0211f4..d0aaef7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java
@@ -56,7 +56,7 @@ public abstract class AbstractFileIO implements FileIO {
i += n;
time = 0;
}
- else if (n == 0) {
+ else if (n == 0 || i > 0) {
if (!write && available(num - i, position + i) == 0)
return i;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
index fd00e25..7c6ece8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.file;
import java.io.File;
+import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
@@ -25,8 +26,10 @@ import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.OpenOption;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.compress.FileSystemUtils;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* File I/O implementation based on {@link AsynchronousFileChannel}.
@@ -37,6 +40,12 @@ public class AsyncFileIO extends AbstractFileIO {
*/
private final AsynchronousFileChannel ch;
+ /** Native file descriptor. */
+ private final int fd;
+
+ /** */
+ private final int fsBlockSize;
+
/**
* Channel's position.
*/
@@ -54,11 +63,36 @@ public class AsyncFileIO extends AbstractFileIO {
* @param modes Open modes.
*/
public AsyncFileIO(File file, ThreadLocal<ChannelOpFuture> holder, OpenOption... modes) throws IOException {
- this.ch = AsynchronousFileChannel.open(file.toPath(), modes);
-
+ ch = AsynchronousFileChannel.open(file.toPath(), modes);
+ fd = getFileDescriptor(ch);
+ fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd);
this.holder = holder;
}
+ /**
+ * @param ch File channel.
+ * @return Native file descriptor.
+ */
+ private static int getFileDescriptor(AsynchronousFileChannel ch) {
+ FileDescriptor fd = U.field(ch, "fdObj");
+ return U.field(fd, "fd");
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ return fsBlockSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return FileSystemUtils.getSparseFileSize(fd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long position, int len) {
+ return (int)FileSystemUtils.punchHole(fd, position, len, fsBlockSize);
+ }
+
/** {@inheritDoc} */
@Override public long position() throws IOException {
return position;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java
index 86d9bbc..f21b8ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
-
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
@@ -100,6 +99,21 @@ public class EncryptedFileIO implements FileIO {
}
/** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long position, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public long position() throws IOException {
return plainFileIO.position();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 6f32d01..546d1a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -253,4 +253,23 @@ public interface FileIO extends AutoCloseable {
* @throws IOException If some I/O error occurs.
*/
@Override public void close() throws IOException;
+
+ /**
+ * @return File system block size or negative value if unknown.
+ */
+ public int getFileSystemBlockSize();
+
+ /**
+ * @param position Starting file position.
+ * @param len Number of bytes to free.
+ * @return The actual freed size or negative value if not supported.
+ */
+ int punchHole(long position, int len);
+
+ /**
+ * @return Approximate system dependent size of the storage or negative
+ * value if not supported.
+ * @see #punchHole
+ */
+ long getSparseSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
index 8e79b54..c615a34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -26,7 +26,7 @@ import java.nio.MappedByteBuffer;
*/
public class FileIODecorator extends AbstractFileIO {
/** File I/O delegate */
- private final FileIO delegate;
+ protected final FileIO delegate;
/**
*
@@ -37,6 +37,21 @@ public class FileIODecorator extends AbstractFileIO {
}
/** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ return delegate.getFileSystemBlockSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return delegate.getSparseSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long pos, int len) {
+ return delegate.punchHole(pos, len);
+ }
+
+ /** {@inheritDoc} */
@Override public long position() throws IOException {
return delegate.position();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 16d74c3..a8fae08 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -27,8 +27,8 @@ import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.pagemem.PageIdUtils;
@@ -71,7 +71,7 @@ public class FilePageStore implements PageStore {
private final FileIOFactory ioFactory;
/** I/O interface for read/write operations with file */
- private volatile FileIO fileIO;
+ protected volatile FileIO fileIO;
/** */
private final AtomicLong allocated;
@@ -80,7 +80,7 @@ public class FilePageStore implements PageStore {
private final AllocatedPageTracker allocatedTracker;
/** */
- private final int pageSize;
+ protected final int pageSize;
/** */
private volatile boolean inited;
@@ -105,7 +105,8 @@ public class FilePageStore implements PageStore {
File file,
FileIOFactory factory,
DataStorageConfiguration cfg,
- AllocatedPageTracker allocatedTracker) {
+ AllocatedPageTracker allocatedTracker
+ ) {
this.type = type;
this.cfgFile = file;
this.dbCfg = cfg;
@@ -116,6 +117,38 @@ public class FilePageStore implements PageStore {
}
/** {@inheritDoc} */
+ @Override public int getPageSize() {
+ return pageSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getBlockSize() {
+ return -1; // Header is unaligned in this version.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long size() {
+ try {
+ FileIO io = fileIO;
+
+ return io == null ? 0 : io.size();
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void punchHole(long pageId, int usefulBytes) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public boolean exists() {
return cfgFile.exists() && cfgFile.length() > headerSize();
}
@@ -228,10 +261,8 @@ public class FilePageStore implements PageStore {
if (fileSize == headerSize()) // Every file has a special meta page.
fileSize = pageSize + headerSize();
- if ((fileSize - headerSize()) % pageSize != 0)
- throw new IOException(prefix + "(invalid file size)" +
- " [fileSize=" + U.hexLong(fileSize) +
- ", pageSize=" + U.hexLong(pageSize) + ']');
+ if (fileSize % pageSize != 0) // In the case of compressed pages we can miss the tail of the page.
+ fileSize = (fileSize / pageSize + 1) * pageSize;
return fileSize;
}
@@ -333,6 +364,26 @@ public class FilePageStore implements PageStore {
}
}
+ /**
+ * @param pageId Page ID.
+ * @param pageBuf Page buffer.
+ * @return Number of bytes to calculate CRC on.
+ */
+ private int getCrcSize(long pageId, ByteBuffer pageBuf) throws IOException {
+ int compressedSize = PageIO.getCompressedSize(pageBuf);
+
+ if (compressedSize == 0)
+ return pageSize; // Page is not compressed.
+
+ if (compressedSize < 0 || compressedSize > pageSize) {
+ throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " +
+ "[id=" + U.hexLong(pageId) + ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + fileIO.size() +
+ ", page=" + U.toHexString(pageBuf) + "]");
+ }
+
+ return compressedSize;
+ }
+
/** {@inheritDoc} */
@Override public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
init();
@@ -363,7 +414,7 @@ public class FilePageStore implements PageStore {
pageBuf.position(0);
if (!skipCrc) {
- int curCrc32 = FastCrc.calcCrc(pageBuf, pageSize);
+ int curCrc32 = FastCrc.calcCrc(pageBuf, getCrcSize(pageId, pageBuf));
if ((savedCrc32 ^ curCrc32) != 0)
throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " +
@@ -549,7 +600,6 @@ public class FilePageStore implements PageStore {
"off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) +
", pageId=" + U.hexLong(pageId) + ", file=" + cfgFile.getPath();
- assert pageBuf.capacity() == pageSize;
assert pageBuf.position() == 0;
assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer order " + pageBuf.order()
+ " should be same with " + ByteOrder.nativeOrder();
@@ -559,7 +609,7 @@ public class FilePageStore implements PageStore {
if (calculateCrc && !skipCrc) {
assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId);
- PageIO.setCrc(pageBuf, calcCrc32(pageBuf, pageSize));
+ PageIO.setCrc(pageBuf, calcCrc32(pageBuf, getCrcSize(pageId, pageBuf)));
}
// Check whether crc was calculated somewhere above the stack if it is forcibly skipped.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 4a14c6b..86560ba 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -30,6 +30,7 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
+import java.util.AbstractList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -54,6 +55,7 @@ import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker;
@@ -62,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -73,6 +76,7 @@ import org.jetbrains.annotations.Nullable;
import static java.nio.file.Files.delete;
import static java.nio.file.Files.newDirectoryStream;
+import static java.util.Objects.requireNonNull;
/**
* File page store manager.
@@ -461,6 +465,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
try {
store.read(pageId, pageBuf, keepCrc);
+
+ assert keepCrc || PageIO.getCrc(pageBuf) == 0: store.size() - store.pageOffset(pageId);
+
+ cctx.kernalContext().compress().decompressPage(pageBuf, store.getPageSize());
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -511,13 +519,40 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* @return PageStore to which the page has been written.
* @throws IgniteCheckedException If IO error occurred.
*/
- public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException {
+ public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc)
+ throws IgniteCheckedException {
int partId = PageIdUtils.partId(pageId);
PageStore store = getStore(cacheId, partId);
try {
+ int pageSize = store.getPageSize();
+ int compressedPageSize = pageSize;
+
+ GridCacheContext cctx0 = cctx.cacheContext(cacheId);
+
+ if (cctx0 != null) {
+ assert pageBuf.position() == 0 && pageBuf.limit() == pageSize: pageBuf;
+
+ ByteBuffer compressedPageBuf = cctx0.compress().compressPage(pageBuf, store);
+
+ if (compressedPageBuf != pageBuf) {
+ compressedPageSize = PageIO.getCompressedSize(compressedPageBuf);
+
+ if (!calculateCrc) {
+ calculateCrc = true;
+ PageIO.setCrc(compressedPageBuf, 0); // It will be recalculated over compressed data further.
+ }
+
+ PageIO.setCrc(pageBuf, 0); // It is expected to be reset to 0 after each write.
+ pageBuf = compressedPageBuf;
+ }
+ }
+
store.write(pageId, pageBuf, tag, calculateCrc);
+
+ if (pageSize > compressedPageSize)
+ store.punchHole(pageId, compressedPageSize); // TODO maybe add async punch mode?
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -1048,6 +1083,15 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/**
* @param grpId Cache group ID.
+ * @return Collection of related page stores.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<PageStore> getStores(int grpId) throws IgniteCheckedException {
+ return getHolder(grpId);
+ }
+
+ /**
+ * @param grpId Cache group ID.
* @param partId Partition ID.
* @return Page store for the corresponding parameters.
* @throws IgniteCheckedException If cache or partition with the given ID was not created.
@@ -1125,7 +1169,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/**
*
*/
- private static class CacheStoreHolder {
+ private static class CacheStoreHolder extends AbstractList<PageStore> {
/** Index store. */
private final PageStore idxStore;
@@ -1133,11 +1177,20 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
private final PageStore[] partStores;
/**
- *
*/
- public CacheStoreHolder(PageStore idxStore, PageStore[] partStores) {
- this.idxStore = idxStore;
- this.partStores = partStores;
+ CacheStoreHolder(PageStore idxStore, PageStore[] partStores) {
+ this.idxStore = requireNonNull(idxStore);
+ this.partStores = requireNonNull(partStores);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PageStore get(int idx) {
+ return requireNonNull(idx == partStores.length ? idxStore : partStores[idx]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return partStores.length + 1;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
index d8c800d..de078eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java
@@ -57,4 +57,25 @@ public class FilePageStoreV2 extends FilePageStore {
@Override public int version() {
return VERSION;
}
+
+ /** {@inheritDoc} */
+ @Override public int getBlockSize() {
+ return fileIO.getFileSystemBlockSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ FileIO io = fileIO;
+
+ return io == null ? 0 : fileIO.getSparseSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void punchHole(long pageId, int usefulBytes) {
+ assert usefulBytes >= 0 && usefulBytes < pageSize: usefulBytes;
+
+ long off = pageOffset(pageId);
+
+ fileIO.punchHole(off + usefulBytes, pageSize - usefulBytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index ef4a3df..c6922bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.processors.cache.persistence.file;
import java.io.File;
+import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
+import org.apache.ignite.internal.processors.compress.FileSystemUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* File I/O implementation based on {@link FileChannel}.
@@ -33,6 +36,12 @@ public class RandomAccessFileIO extends AbstractFileIO {
*/
private final FileChannel ch;
+ /** Native file descriptor. */
+ private final int fd;
+
+ /** */
+ private final int fsBlockSize;
+
/**
* Creates I/O implementation for specified {@code file}
*
@@ -41,6 +50,32 @@ public class RandomAccessFileIO extends AbstractFileIO {
*/
public RandomAccessFileIO(File file, OpenOption... modes) throws IOException {
ch = FileChannel.open(file.toPath(), modes);
+ fd = getNativeFileDescriptor(ch);
+ fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd);
+ }
+
+ /**
+ * @param ch File channel.
+ * @return Native file descriptor.
+ */
+ private static int getNativeFileDescriptor(FileChannel ch) {
+ FileDescriptor fd = U.field(ch, "fd");
+ return U.field(fd, "fd");
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ return fsBlockSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return FileSystemUtils.getSparseFileSize(fd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long position, int len) {
+ return (int)FileSystemUtils.punchHole(fd, position, len, fsBlockSize);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
index 6345b1f..5300d83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
@@ -54,6 +54,21 @@ public class UnzipFileIO extends AbstractFileIO {
}
/** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long position, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public long position() throws IOException {
return totalBytesRead;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index b64b294..03f66c0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -530,7 +530,7 @@ public class PageMemoryImpl implements PageMemoryEx {
PageHeader.writeTimestamp(absPtr, U.currentTimeMillis());
rwLock.init(absPtr + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId));
- assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
+ assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
assert !PageHeader.isAcquired(absPtr) :
"Pin counter must be 0 for a new page [relPtr=" + U.hexLong(relPtr) +
@@ -808,9 +808,9 @@ public class PageMemoryImpl implements PageMemoryEx {
memMetrics.onPageRead();
}
- catch (IgniteDataIntegrityViolationException ignore) {
+ catch (IgniteDataIntegrityViolationException e) {
U.warn(log, "Failed to read page (data integrity violation encountered, will try to " +
- "restore using existing WAL) [fullPageId=" + fullId + ']');
+ "restore using existing WAL) [fullPageId=" + fullId + ']', e);
buf.rewind();
@@ -1252,8 +1252,8 @@ public class PageMemoryImpl implements PageMemoryEx {
GridUnsafe.copyMemory(absPtr + PAGE_OVERHEAD, tmpPtr, pageSize());
- assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
- assert GridUnsafe.getInt(tmpPtr + 4) == 0; //TODO GG-11480
+ assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
+ assert PageIO.getCrc(tmpPtr) == 0; //TODO GG-11480
}
else {
byte[] arr = buf.array();
@@ -1402,7 +1402,7 @@ public class PageMemoryImpl implements PageMemoryEx {
if (touch)
PageHeader.writeTimestamp(absPtr, U.currentTimeMillis());
- assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
+ assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
return absPtr + PAGE_OVERHEAD;
}
@@ -1490,11 +1490,11 @@ public class PageMemoryImpl implements PageMemoryEx {
PageHeader.dirty(absPtr, false);
PageHeader.tempBufferPointer(absPtr, tmpRelPtr);
- assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
- assert GridUnsafe.getInt(tmpAbsPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
+ assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
+ assert PageIO.getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
}
- assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
+ assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
return absPtr + PAGE_OVERHEAD;
}
@@ -1522,7 +1522,7 @@ public class PageMemoryImpl implements PageMemoryEx {
boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE || !wasDirty);
- assert GridUnsafe.getInt(page + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
+ assert PageIO.getCrc(page + PAGE_OVERHEAD) == 0; //TODO GG-11480
if (markDirty)
setDirty(fullId, page, markDirty, false);