You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/29 12:16:05 UTC
[09/50] [abbrv] ignite git commit: IGNITE-10330: Disk page
compression. - Fixes #5200.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
index 4a12045..6176eeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
@@ -33,10 +33,12 @@ import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+
/**
* Data pages IO.
*/
-public abstract class AbstractDataPageIO<T extends Storable> extends PageIO {
+public abstract class AbstractDataPageIO<T extends Storable> extends PageIO implements CompactablePageIO {
/** */
private static final int SHOW_ITEM = 0b0001;
@@ -228,7 +230,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO {
* @param pageAddr Page address.
* @return Free space.
*/
- private int getRealFreeSpace(long pageAddr) {
+ public int getRealFreeSpace(long pageAddr) {
return PageUtils.getShort(pageAddr, FREE_SPACE_OFF);
}
@@ -822,9 +824,10 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO {
* @param pageAddr Page address.
* @param payload Payload.
* @param pageSize Page size.
+ * @return Item ID.
* @throws IgniteCheckedException If failed.
*/
- public void addRow(
+ public int addRow(
long pageAddr,
byte[] payload,
int pageSize
@@ -840,7 +843,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO {
writeRowData(pageAddr, dataOff, payload);
- addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize);
+ return addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize);
}
/**
@@ -1106,6 +1109,62 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO {
return directCnt; // Previous directCnt will be our itemId.
}
+ /** {@inheritDoc} */
+ @Override public void compactPage(ByteBuffer page, ByteBuffer out, int pageSize) {
+ // TODO May we compactDataEntries in-place and then copy compacted data to out?
+ copyPage(page, out, pageSize);
+
+ long pageAddr = bufferAddress(out);
+
+ int freeSpace = getRealFreeSpace(pageAddr);
+
+ if (freeSpace == 0)
+ return; // No garbage: nothing to compact here.
+
+ int directCnt = getDirectCount(pageAddr);
+
+ if (directCnt != 0) {
+ int firstOff = getFirstEntryOffset(pageAddr);
+
+ if (firstOff - freeSpace != getHeaderSizeWithItems(pageAddr, directCnt)) {
+ firstOff = compactDataEntries(pageAddr, directCnt, pageSize);
+ setFirstEntryOffset(pageAddr, firstOff, pageSize);
+ }
+
+ // Move all the data entries from page end to the page header to close the gap.
+ moveBytes(pageAddr, firstOff, pageSize - firstOff, -freeSpace, pageSize);
+ }
+
+ out.limit(pageSize - freeSpace); // Here we have only meaningful data of this page.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restorePage(ByteBuffer page, int pageSize) {
+ assert page.isDirect();
+ assert page.position() == 0;
+ assert page.limit() <= pageSize;
+
+ long pageAddr = bufferAddress(page);
+
+ int freeSpace = getRealFreeSpace(pageAddr);
+
+ if (freeSpace != 0) {
+ int firstOff = getFirstEntryOffset(pageAddr);
+ int cnt = pageSize - firstOff;
+
+ if (cnt != 0) {
+ int off = page.limit() - cnt;
+
+ assert off > PageIO.COMMON_HEADER_END: off;
+ assert cnt > 0 : cnt;
+
+ moveBytes(pageAddr, off, cnt, freeSpace, pageSize);
+ }
+ }
+
+ page.limit(pageSize);
+ }
+
/**
* @param pageAddr Page address.
* @param directCnt Direct items count.
@@ -1203,7 +1262,16 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO {
entriesSize += entrySize;
}
- return pageSize - ITEMS_OFF - entriesSize - (directCnt + getIndirectCount(pageAddr)) * ITEM_SIZE;
+ return pageSize - entriesSize - getHeaderSizeWithItems(pageAddr, directCnt);
+ }
+
+ /**
+ * @param pageAddr Page address.
+ * @param directCnt Direct items count.
+ * @return Size of the page header including all items.
+ */
+ private int getHeaderSizeWithItems(long pageAddr, int directCnt) {
+ return ITEMS_OFF + (directCnt + getIndirectCount(pageAddr)) * ITEM_SIZE;
}
/**
@@ -1214,6 +1282,7 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO {
* @param pageSize Page size.
*/
private void moveBytes(long addr, int off, int cnt, int step, int pageSize) {
+ assert cnt >= 0: cnt;
assert step != 0 : step;
assert off + step >= 0;
assert off + step + cnt <= pageSize : "[off=" + off + ", step=" + step + ", cnt=" + cnt +
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java
index 349e877..4c656eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/BPlusIO.java
@@ -17,16 +17,18 @@
package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.util.GridStringBuilder;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.lang.IgniteInClosure;
/**
* Abstract IO routines for B+Tree pages.
*/
-public abstract class BPlusIO<L> extends PageIO {
+public abstract class BPlusIO<L> extends PageIO implements CompactablePageIO {
/** */
private static final int CNT_OFF = COMMON_HEADER_END;
@@ -412,4 +414,32 @@ public abstract class BPlusIO<L> extends PageIO {
.a(",\n\tremoveId=").appendHex(getRemoveId(addr))
.a("\n]");
}
+
+ /**
+ * @param pageAddr Page address.
+ * @return Offset after the last item.
+ */
+ public int getItemsEnd(long pageAddr) {
+ int cnt = getCount(pageAddr);
+ return offset(cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void compactPage(ByteBuffer page, ByteBuffer out, int pageSize) {
+ copyPage(page, out, pageSize);
+
+ long pageAddr = GridUnsafe.bufferAddress(out);
+
+ // Just drop all the extra garbage at the end.
+ out.limit(getItemsEnd(pageAddr));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restorePage(ByteBuffer compactPage, int pageSize) {
+ assert compactPage.isDirect();
+ assert compactPage.position() == 0;
+ assert compactPage.limit() <= pageSize;
+
+ compactPage.limit(pageSize); // Just add garbage to the end.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java
new file mode 100644
index 0000000..775a1f8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/CompactablePageIO.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Page IO that supports compaction.
+ */
+public interface CompactablePageIO {
+ /**
+ * Compacts page contents to the output buffer.
+ * Implementation must not change contents, position and limit of the original page buffer.
+ *
+ * @param page Page buffer.
+ * @param out Output buffer.
+ * @param pageSize Page size.
+ */
+ void compactPage(ByteBuffer page, ByteBuffer out, int pageSize);
+
+ /**
+ * Restores the original page in place.
+ *
+ * @param compactPage Compact page.
+ * @param pageSize Page size.
+ */
+ void restorePage(ByteBuffer compactPage, int pageSize);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java
index 49eed88..e58aad9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPagePayload.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
/**
*
*/
@@ -61,4 +64,17 @@ public class DataPagePayload {
public long nextLink() {
return nextLink;
}
+
+ /**
+ * @param pageAddr Page address.
+ * @return Payload bytes.
+ */
+ public byte[] getBytes(long pageAddr) {
+ return PageUtils.getBytes(pageAddr, off, payloadSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataPagePayload.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index ee61e25..85a1e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -83,6 +83,9 @@ import org.apache.ignite.internal.util.GridStringBuilder;
*/
public abstract class PageIO {
/** */
+ private static PageIO testIO;
+
+ /** */
private static BPlusInnerIO<?> innerTestIO;
/** */
@@ -131,16 +134,19 @@ public abstract class PageIO {
public static final int ROTATED_ID_PART_OFF = PAGE_ID_OFF + 8;
/** */
- private static final int RESERVED_BYTE_OFF = ROTATED_ID_PART_OFF + 1;
+ private static final int COMPRESSION_TYPE_OFF = ROTATED_ID_PART_OFF + 1;
+
+ /** */
+ private static final int COMPRESSED_SIZE_OFF = COMPRESSION_TYPE_OFF + 1;
/** */
- private static final int RESERVED_SHORT_OFF = RESERVED_BYTE_OFF + 1;
+ private static final int COMPACTED_SIZE_OFF = COMPRESSED_SIZE_OFF + 2;
/** */
- private static final int RESERVED_INT_OFF = RESERVED_SHORT_OFF + 2;
+ private static final int RESERVED_SHORT_OFF = COMPACTED_SIZE_OFF + 2;
/** */
- private static final int RESERVED_2_OFF = RESERVED_INT_OFF + 4;
+ private static final int RESERVED_2_OFF = RESERVED_SHORT_OFF + 2;
/** */
private static final int RESERVED_3_OFF = RESERVED_2_OFF + 8;
@@ -382,6 +388,54 @@ public abstract class PageIO {
}
/**
+ * @param page Page buffer.
+ * @param compressType Compression type.
+ */
+ public static void setCompressionType(ByteBuffer page, byte compressType) {
+ page.put(COMPRESSION_TYPE_OFF, compressType);
+ }
+
+ /**
+ * @param page Page buffer.
+ * @return Compression type.
+ */
+ public static byte getCompressionType(ByteBuffer page) {
+ return page.get(COMPRESSION_TYPE_OFF);
+ }
+
+ /**
+ * @param page Page buffer.
+ * @param compressedSize Compressed size.
+ */
+ public static void setCompressedSize(ByteBuffer page, short compressedSize) {
+ page.putShort(COMPRESSED_SIZE_OFF, compressedSize);
+ }
+
+ /**
+ * @param page Page buffer.
+ * @return Compressed size.
+ */
+ public static short getCompressedSize(ByteBuffer page) {
+ return page.getShort(COMPRESSED_SIZE_OFF);
+ }
+
+ /**
+ * @param page Page buffer.
+ * @param compactedSize Compacted size.
+ */
+ public static void setCompactedSize(ByteBuffer page, short compactedSize) {
+ page.putShort(COMPACTED_SIZE_OFF, compactedSize);
+ }
+
+ /**
+ * @param page Page buffer.
+ * @return Compacted size.
+ */
+ public static short getCompactedSize(ByteBuffer page) {
+ return page.getShort(COMPACTED_SIZE_OFF);
+ }
+
+ /**
* @param pageAddr Page address.
* @return Checksum.
*/
@@ -487,6 +541,15 @@ public abstract class PageIO {
}
/**
+ * Registers IO for testing.
+ *
+ * @param io Page IO.
+ */
+ public static void registerTest(PageIO io) {
+ testIO = io;
+ }
+
+ /**
* @return Type.
*/
public final int getType() {
@@ -513,7 +576,8 @@ public abstract class PageIO {
setPageId(pageAddr, pageId);
setCrc(pageAddr, 0);
- PageUtils.putLong(pageAddr, ROTATED_ID_PART_OFF, 0L); // 1 + reserved(1+2+4)
+ // rotated(1) + compress_type(1) + compressed_size(2) + compacted_size(2) + reserved(2)
+ PageUtils.putLong(pageAddr, ROTATED_ID_PART_OFF, 0L);
PageUtils.putLong(pageAddr, RESERVED_2_OFF, 0L);
PageUtils.putLong(pageAddr, RESERVED_3_OFF, 0L);
}
@@ -536,6 +600,15 @@ public abstract class PageIO {
}
/**
+ * @param page Page.
+ * @return Page IO.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static <Q extends PageIO> Q getPageIO(ByteBuffer page) throws IgniteCheckedException {
+ return getPageIO(getType(page), getVersion(page));
+ }
+
+ /**
* @param type IO Type.
* @param ver IO Version.
* @return Page IO.
@@ -572,6 +645,11 @@ public abstract class PageIO {
return (Q)SimpleDataPageIO.VERSIONS.forVersion(ver);
default:
+ if (testIO != null) {
+ if (testIO.type == type && testIO.ver == ver)
+ return (Q)testIO;
+ }
+
return (Q)getBPlusIO(type, ver);
}
}
@@ -715,6 +793,21 @@ public abstract class PageIO {
protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException;
/**
+ * @param page Page.
+ * @param out Output buffer.
+ * @param pageSize Page size.
+ */
+ protected final void copyPage(ByteBuffer page, ByteBuffer out, int pageSize) {
+ assert out.position() == 0;
+ assert pageSize <= out.remaining();
+ assert pageSize == page.remaining();
+
+ page.mark();
+ out.put(page).flip();
+ page.reset();
+ }
+
+ /**
* @param addr Address.
*/
public static String printPage(long addr, int pageSize) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e70a027..03bac2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
@@ -671,4 +672,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
@NotNull @Override public Iterator<GridComponent> iterator() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public CompressionProcessor compress() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
new file mode 100644
index 0000000..8b917b3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteComponentType;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+
+/**
+ * Compression processor.
+ *
+ * @see IgniteComponentType#COMPRESSION
+ */
+public class CompressionProcessor extends GridProcessorAdapter {
+ /** */
+ public static final int LZ4_MIN_LEVEL = 0;
+
+ /** */
+ public static final int LZ4_MAX_LEVEL = 17;
+
+ /** */
+ public static final int LZ4_DEFAULT_LEVEL = 0;
+
+ /** */
+ public static final int ZSTD_MIN_LEVEL = -131072;
+
+ /** */
+ public static final int ZSTD_MAX_LEVEL = 22;
+
+ /** */
+ public static final int ZSTD_DEFAULT_LEVEL = 3;
+
+ /** */
+ protected static final byte UNCOMPRESSED_PAGE = 0;
+
+ /** */
+ protected static final byte COMPACTED_PAGE = 1;
+
+ /** */
+ protected static final byte ZSTD_COMPRESSED_PAGE = 2;
+
+ /** */
+ protected static final byte LZ4_COMPRESSED_PAGE = 3;
+
+ /** */
+ protected static final byte SNAPPY_COMPRESSED_PAGE = 4;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public CompressionProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * @param compression Compression algorithm.
+ * @return Default compression level.
+ */
+ public static int getDefaultCompressionLevel(DiskPageCompression compression) {
+ switch (compression) {
+ case ZSTD:
+ return ZSTD_DEFAULT_LEVEL;
+
+ case LZ4:
+ return LZ4_DEFAULT_LEVEL;
+
+ case SNAPPY:
+ case SKIP_GARBAGE:
+ return 0;
+ }
+
+ throw new IllegalArgumentException("Compression: " + compression);
+ }
+
+ /**
+ * @param compressLevel Compression level.
+ * @param compression Compression algorithm.
+ * @return Compression level.
+ */
+ public static int checkCompressionLevelBounds(int compressLevel, DiskPageCompression compression) {
+ switch (compression) {
+ case ZSTD:
+ checkCompressionLevelBounds(compressLevel, ZSTD_MIN_LEVEL, ZSTD_MAX_LEVEL);
+ break;
+
+ case LZ4:
+ checkCompressionLevelBounds(compressLevel, LZ4_MIN_LEVEL, LZ4_MAX_LEVEL);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Compression level for " + compression + " is not supported.");
+ }
+
+ return compressLevel;
+ }
+
+ /**
+ * @param compressLevel Compression level.
+ * @param min Min level.
+ * @param max Max level.
+ */
+ private static void checkCompressionLevelBounds(int compressLevel, int min, int max) {
+ if (compressLevel < min || compressLevel > max) {
+ throw new IllegalArgumentException("Compression level for LZ4 must be between " + min +
+ " and " + max + ".");
+ }
+ }
+
+ /**
+ * @throws IgniteCheckedException Always.
+ */
+ private static <T> T fail() throws IgniteCheckedException {
+ throw new IgniteCheckedException("Make sure that ignite-compress module is in classpath.");
+ }
+
+ /**
+ * @param storagePath Storage path.
+ * @param pageSize Page size.
+ * @throws IgniteCheckedException If compression is not supported.
+ */
+ public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException {
+ fail();
+ }
+
+ /**
+ * @param page Page buffer.
+ * @param pageSize Page size.
+ * @param storeBlockSize Store block size.
+ * @param compression Compression algorithm.
+ * @param compressLevel Compression level.
+ * @return Possibly compressed buffer.
+ * @throws IgniteCheckedException If failed.
+ */
+ public ByteBuffer compressPage(
+ ByteBuffer page,
+ int pageSize,
+ int storeBlockSize,
+ DiskPageCompression compression,
+ int compressLevel
+ ) throws IgniteCheckedException {
+ return fail();
+ }
+
+ /**
+ * @param page Possibly compressed page buffer.
+ * @param pageSize Page size.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException {
+ if (PageIO.getCompressionType(page) != UNCOMPRESSED_PAGE)
+ fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java
new file mode 100644
index 0000000..1877640
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/FileSystemUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.nio.file.Path;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteComponentType;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Native file system API.
+ */
+public final class FileSystemUtils {
+ /** */
+ private static final String NATIVE_FS_LINUX_CLASS =
+ "org.apache.ignite.internal.processors.compress.NativeFileSystemLinux";
+
+ /** */
+ private static final NativeFileSystem fs;
+
+ /** */
+ private static volatile Throwable err;
+
+ /** */
+ static {
+ NativeFileSystem x = null;
+
+ try {
+ if (IgniteComponentType.COMPRESSION.inClassPath()) {
+ if (U.isLinux())
+ x = U.newInstance(NATIVE_FS_LINUX_CLASS);
+ }
+ }
+ catch (Throwable e) {
+ err = e;
+ }
+
+ fs = x;
+ }
+
+ /**
+ */
+ public static void checkSupported() {
+ Throwable e = err;
+
+ if (e != null || fs == null)
+ throw new IgniteException("Native file system API is not supported on " + U.osString(), e);
+ }
+
+ /**
+ * @param path File system path.
+ * @return File system block size or negative value if not supported.
+ */
+ public static int getFileSystemBlockSize(Path path) {
+ return fs == null ? -1 : fs.getFileSystemBlockSize(path);
+ }
+
+ /**
+ * @param fd Native file descriptor.
+ * @return File system block size or negative value if not supported.
+ */
+ public static int getFileSystemBlockSize(int fd) {
+ return fs == null ? -1 : fs.getFileSystemBlockSize(fd);
+ }
+
+ /**
+ * !!! Use with caution. May produce unexpected results.
+ *
+ * Known to work correctly on Linux EXT4 and Btrfs,
+ * while on XSF it returns meaningful result only after
+ * file reopening.
+ *
+ * @param fd Native file descriptor.
+ * @return Approximate system dependent size of the sparse file or negative
+ * value if not supported.
+ */
+ public static long getSparseFileSize(int fd) {
+ return fs == null ? -1 : fs.getSparseFileSize(fd);
+ }
+
+ /**
+ * @param fd Native file descriptor.
+ * @param off Offset of the hole.
+ * @param len Length of the hole.
+ * @param fsBlockSize File system block size.
+ * @return Actual punched hole size.
+ */
+ public static long punchHole(int fd, long off, long len, int fsBlockSize) {
+ assert off >= 0;
+ assert len > 0;
+
+ checkSupported();
+
+ if (len < fsBlockSize)
+ return 0;
+
+ // TODO maybe optimize for power of 2
+ if (off % fsBlockSize != 0) {
+ long end = off + len;
+ off = (off / fsBlockSize + 1) * fsBlockSize;
+ len = end - off;
+
+ if (len <= 0)
+ return 0;
+ }
+
+ len = len / fsBlockSize * fsBlockSize;
+
+ if (len > 0)
+ fs.punchHole(fd, off, len);
+
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java
new file mode 100644
index 0000000..673d1bc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystem.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compress;
+
+import java.nio.file.Path;
+
+/**
+ * Native file system API.
+ */
+public interface NativeFileSystem {
+ /**
+ * @param path Path.
+ * @return File system block size in bytes.
+ */
+ int getFileSystemBlockSize(Path path);
+
+ /**
+ * @param fd Native file descriptor.
+ * @return File system block size in bytes.
+ */
+ int getFileSystemBlockSize(int fd);
+
+ /**
+ * @param fd Native file descriptor.
+ * @param off Offset of the hole.
+ * @param len Length of the hole.
+ */
+ void punchHole(int fd, long off, long len);
+
+ /**
+ * @param fd Native file descriptor.
+ * @return Approximate system dependent size of the sparse file.
+ */
+ long getSparseFileSize(int fd);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 2cc0ae3..6b8e2b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -1339,6 +1339,7 @@ public abstract class GridUnsafe {
* @return Buffer memory address.
*/
public static long bufferAddress(ByteBuffer buf) {
+ assert buf.isDirect();
return UNSAFE.getLong(buf, DIRECT_BUF_ADDR_OFF);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 2d6b584..946378d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -3933,15 +3933,23 @@ public abstract class IgniteUtils {
* @return Hex string.
*/
public static String byteArray2HexString(byte[] arr) {
- SB sb = new SB(arr.length << 1);
+ StringBuilder sb = new StringBuilder(arr.length << 1);
for (byte b : arr)
- sb.a(Integer.toHexString(MASK & b >>> 4)).a(Integer.toHexString(MASK & b));
+ addByteAsHex(sb, b);
return sb.toString().toUpperCase();
}
/**
+ * @param sb String builder.
+ * @param b Byte to add in hexadecimal format.
+ */
+ private static void addByteAsHex(StringBuilder sb, byte b) {
+ sb.append(Integer.toHexString(MASK & b >>> 4)).append(Integer.toHexString(MASK & b));
+ }
+
+ /**
* Checks for containment of the value in the array.
* Both array cells and value may be {@code null}. Two {@code null}s are considered equal.
*
@@ -10552,12 +10560,10 @@ public abstract class IgniteUtils {
* @return hex representation of memory region
*/
public static String toHexString(long addr, int len) {
- assert (len & 0b111) == 0 && len > 0;
-
StringBuilder sb = new StringBuilder(len * 2);
- for (int i = 0; i < len; i += 8)
- sb.append(U.hexLong(GridUnsafe.getLong(addr + i)));
+ for (int i = 0; i < len; i++) // Can not use getLong because on little-endian it produces bs.
+ addByteAsHex(sb, GridUnsafe.getByte(addr + i));
return sb.toString();
}
@@ -10568,12 +10574,10 @@ public abstract class IgniteUtils {
* @return hex representation of memory region
*/
public static String toHexString(ByteBuffer buf) {
- assert (buf.capacity() & 0b111) == 0;
-
StringBuilder sb = new StringBuilder(buf.capacity() * 2);
- for (int i = 0; i < buf.capacity(); i += 8)
- sb.append(U.hexLong(buf.getLong(i)));
+ for (int i = 0; i < buf.capacity(); i++)
+ addByteAsHex(sb, buf.get(i)); // Can not use getLong because on little-endian it produces bs.
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
index 2612a41..cf8aa76 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
@@ -179,4 +179,16 @@ public interface CacheGroupMetricsMXBean {
*/
@MXBeanDescription("Total size of memory allocated for group, in bytes.")
public long getTotalAllocatedSize();
+
+ /**
+ * Storage space allocated for group, in bytes.
+ */
+ @MXBeanDescription("Storage space allocated for group, in bytes.")
+ public long getStorageSize();
+
+ /**
+ * Storage space allocated for group adjusted for possible sparsity, in bytes.
+ */
+ @MXBeanDescription("Storage space allocated for group adjusted for possible sparsity, in bytes.")
+ public long getSparseStorageSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
index 2069099..4689f15 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java
@@ -174,4 +174,12 @@ public interface DataStorageMetricsMXBean extends DataStorageMetrics {
"Number of subintervals to set."
)
public void subIntervals(int subInts);
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Storage space allocated, in bytes.")
+ @Override long getStorageSize();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Storage space allocated adjusted for possible sparsity, in bytes.")
+ @Override long getSparseStorageSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index 36d8b41..ed6eb86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -120,7 +120,6 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
}
DataStorageConfiguration memCfg = new DataStorageConfiguration();
- memCfg.setPageSize(4 * 1024);
memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(150L * 1024 * 1024)
.setPersistenceEnabled(persistenceEnabled()));
@@ -1330,6 +1329,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
ccfgs[4] = cacheConfiguration(CACHE_NAME_PREFIX + 4, TRANSACTIONAL);
ccfgs[4].setDataRegionName(NO_PERSISTENCE_REGION);
+ ccfgs[4].setDiskPageCompression(null);
return ccfgs;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
index 1b36ac1..c402ad7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java
@@ -17,25 +17,40 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridStringBuilder;
/**
* Dummy PageIO implementation. For test purposes only.
*/
-public class DummyPageIO extends PageIO {
+public class DummyPageIO extends PageIO implements CompactablePageIO {
/** */
public DummyPageIO() {
super(2 * Short.MAX_VALUE, 1);
+
+ PageIO.registerTest(this);
}
/** {@inheritDoc} */
- @Override
- protected void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
+ @Override protected void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
sb.a("DummyPageIO [\n");
sb.a("addr=").a(addr).a(", ");
sb.a("pageSize=").a(addr);
sb.a("\n]");
}
+
+ /** {@inheritDoc} */
+ @Override public void compactPage(ByteBuffer page, ByteBuffer out, int pageSize) {
+ copyPage(page, out, pageSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restorePage(ByteBuffer p, int pageSize) {
+ assert p.isDirect();
+ assert p.position() == 0;
+ assert p.limit() == pageSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
index 4db1de9..0f5aef9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
@@ -55,6 +55,9 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
/** */
private static final String GROUP1 = "grp1";
+ /** */
+ private static final String NO_PERSISTENCE = "no-persistence";
+
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
cleanPersistenceDir();
@@ -73,19 +76,20 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
cfg.setConsistentId(gridName);
+ long maxRegionSize = 20L * 1024 * 1024;
+
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setMaxSize(10L * 1024 * 1024)
+ .setMaxSize(maxRegionSize)
.setPersistenceEnabled(true)
.setMetricsEnabled(true)
.setName("dflt-plc"))
.setDataRegionConfigurations(new DataRegionConfiguration()
- .setMaxSize(10L * 1024 * 1024)
+ .setMaxSize(maxRegionSize)
.setPersistenceEnabled(false)
.setMetricsEnabled(true)
- .setName("no-persistence"))
+ .setName(NO_PERSISTENCE))
.setWalMode(WALMode.LOG_ONLY)
- .setPageSize(4 * 1024)
.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(memCfg);
@@ -95,7 +99,7 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
cfg.setCacheConfiguration(cacheConfiguration(GROUP1, "cache", PARTITIONED, ATOMIC, 1, null),
- cacheConfiguration(null, "cache-np", PARTITIONED, ATOMIC, 1, "no-persistence"));
+ cacheConfiguration(null, "cache-np", PARTITIONED, ATOMIC, 1, NO_PERSISTENCE));
return cfg;
}
@@ -135,6 +139,9 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setDataRegionName(dataRegName);
+ if (NO_PERSISTENCE.equals(dataRegName))
+ ccfg.setDiskPageCompression(null);
+
return ccfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
index 713d4cc..be3ed0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
@@ -45,8 +45,7 @@ public class IgnitePdsDynamicCacheTest extends IgniteDbDynamicCacheSelfTest {
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(200L * 1024 * 1024).setPersistenceEnabled(true))
- .setWalMode(WALMode.LOG_ONLY)
- .setPageSize(4 * 1024);
+ .setWalMode(WALMode.LOG_ONLY);
cfg.setDataStorageConfiguration(memCfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
index ecc6e02..d6d5c4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
@@ -105,6 +105,7 @@ public class IgnitePdsExchangeDuringCheckpointTest extends GridCommonAbstractTes
CacheConfiguration ccfgNp = new CacheConfiguration("nonPersistentCache");
ccfgNp.setDataRegionName(NO_PERSISTENCE_REGION);
+ ccfgNp.setDiskPageCompression(null);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 4096));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
index 353bc50..d907239 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPageSizesTest.java
@@ -21,16 +21,20 @@ import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION;
+
/**
*
*/
@@ -112,6 +116,10 @@ public class IgnitePdsPageSizesTest extends GridCommonAbstractTest {
* @throws Exception if failed.
*/
private void checkPageSize(int pageSize) throws Exception {
+ if (pageSize <= 4 * 1024 &&
+ IgniteSystemProperties.getEnum(DiskPageCompression.class, IGNITE_DEFAULT_DISK_PAGE_COMPRESSION) != null)
+ return; // Small pages do not work with compression.
+
this.pageSize = pageSize;
IgniteEx ignite = startGrid(0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 48b60d4..56426f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -566,6 +566,21 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
}
/** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ return delegate.getFileSystemBlockSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return delegate.getSparseSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long position, int len) {
+ return delegate.punchHole(position, len);
+ }
+
+ /** {@inheritDoc} */
@Override public long position() throws IOException {
return delegate.position();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
index b8cb047..4787143 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
@@ -57,14 +57,15 @@ public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest {
ccfgs = null;
}
+ long regionMaxSize = 20L * 1024 * 1024;
+
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
- new DataRegionConfiguration().setMaxSize(10L * 1024 * 1024).setPersistenceEnabled(true))
- .setPageSize(4 * 1024)
+ new DataRegionConfiguration().setMaxSize(regionMaxSize).setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY);
memCfg.setDataRegionConfigurations(new DataRegionConfiguration()
- .setMaxSize(10L * 1024 * 1024)
+ .setMaxSize(regionMaxSize)
.setName(NO_PERSISTENCE_REGION)
.setPersistenceEnabled(false));
@@ -210,6 +211,7 @@ public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest {
ccfgs[2] = cacheConfiguration("c3");
ccfgs[2].setDataRegionName(NO_PERSISTENCE_REGION);
+ ccfgs[2].setDiskPageCompression(null);
return ccfgs;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
index c05f65c..0154c14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
@@ -60,7 +60,7 @@ public class IgnitePdsDataRegionMetricsTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final long INIT_REGION_SIZE = 10 << 20;
+ private static final long INIT_REGION_SIZE = 20 << 20;
/** */
private static final long MAX_REGION_SIZE = INIT_REGION_SIZE * 10;
@@ -312,7 +312,13 @@ public class IgnitePdsDataRegionMetricsTest extends GridCommonAbstractTest {
FilePageStore store = (FilePageStore)pageStoreMgr.getStore(CU.cacheId(cacheName), partId(file));
- totalPersistenceSize += path.toFile().length() - store.headerSize();
+ int pageSize = store.getPageSize();
+ long storeSize = path.toFile().length() - store.headerSize();
+
+ if (storeSize % pageSize != 0)
+ storeSize = (storeSize / pageSize + 1) * pageSize; // Adjust for possible page compression.
+
+ totalPersistenceSize += storeSize;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java
index 5593e44..cfbb8d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java
@@ -61,8 +61,6 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
DataStorageConfiguration dbCfg = new DataStorageConfiguration();
- dbCfg.setPageSize(4 * 1024);
-
cfg.setDataStorageConfiguration(dbCfg);
dbCfg.setWalSegmentSize(1024 * 1024)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
index 8baa1c3..c9174ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
@@ -68,7 +68,6 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract
cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
DataStorageConfiguration dsCfg = new DataStorageConfiguration();
- dsCfg.setPageSize(1024); //smaller page to reduce overhead to short values
dsCfg.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java
index 29e113c..70d003d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/DefaultPageSizeBackwardsCompatibilityTest.java
@@ -38,7 +38,7 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** Client mode. */
- private boolean set2kPageSize = true;
+ private boolean set16kPageSize = true;
/** Entries count. */
public static final int ENTRIES_COUNT = 300;
@@ -55,8 +55,10 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac
DataStorageConfiguration memCfg = new DataStorageConfiguration();
- if (set2kPageSize)
- memCfg.setPageSize(2048);
+ if (set16kPageSize)
+ memCfg.setPageSize(16 * 1024);
+ else
+ memCfg.setPageSize(0); // Enforce default.
DataRegionConfiguration memPlcCfg = new DataRegionConfiguration();
memPlcCfg.setMaxSize(100L * 1000 * 1000);
@@ -64,7 +66,7 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac
memPlcCfg.setPersistenceEnabled(true);
memCfg.setDefaultDataRegionConfiguration(memPlcCfg);
- memCfg.setCheckpointFrequency(3_000);
+ memCfg.setCheckpointFrequency(500);
cfg.setDataStorageConfiguration(memCfg);
@@ -75,6 +77,9 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac
ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
+ if (!set16kPageSize)
+ ccfg1.setDiskPageCompression(null);
+
cfg.setCacheConfiguration(ccfg1);
cfg.setConsistentId(gridName);
@@ -99,7 +104,7 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac
/**
* @throws Exception If failed.
*/
- public void testStartFrom2kDefaultStore() throws Exception {
+ public void testStartFrom16kDefaultStore() throws Exception {
startGrids(2);
Ignite ig = ignite(0);
@@ -113,11 +118,11 @@ public class DefaultPageSizeBackwardsCompatibilityTest extends GridCommonAbstrac
for (int i = 0; i < ENTRIES_COUNT; i++)
cache.put(i, i);
- Thread.sleep(5_000); // Await for checkpoint to happen.
+ Thread.sleep(1500); // Await for checkpoint to happen.
stopAllGrids();
- set2kPageSize = false;
+ set16kPageSize = false;
startGrids(2);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 5bf7e7f..14525f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -42,8 +42,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
-
/**
*
*/
@@ -83,7 +81,7 @@ public class WalCompactionTest extends GridCommonAbstractTest {
CacheConfiguration ccfg = new CacheConfiguration();
- ccfg.setName("cache");
+ ccfg.setName(CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
@@ -139,7 +137,9 @@ public class WalCompactionTest extends GridCommonAbstractTest {
IgniteEx ig = (IgniteEx)startGrids(3);
ig.cluster().active(true);
- IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+ IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
+
+ final int pageSize = ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize();
for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total.
final byte[] val = new byte[20000];
@@ -150,9 +150,9 @@ public class WalCompactionTest extends GridCommonAbstractTest {
}
// Spam WAL to move all data records to compressible WAL zone.
- for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++) {
- ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE],
- DFLT_PAGE_SIZE));
+ for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) {
+ ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize],
+ pageSize));
}
// WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
@@ -262,7 +262,7 @@ public class WalCompactionTest extends GridCommonAbstractTest {
IgniteEx ig = startGrid(0);
ig.cluster().active(true);
- IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+ IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
for (int i = 0; i < 2500; i++) { // At least 50MB of raw data in total.
final byte[] val = new byte[20000];
@@ -325,7 +325,9 @@ public class WalCompactionTest extends GridCommonAbstractTest {
IgniteEx ig = (IgniteEx)startGrids(3);
ig.cluster().active(true);
- IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+ IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
+
+ final int pageSize = ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize();
for (int i = 0; i < 100; i++) {
final byte[] val = new byte[20000];
@@ -364,9 +366,9 @@ public class WalCompactionTest extends GridCommonAbstractTest {
}
// Spam WAL to move all data records to compressible WAL zone.
- for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++) {
- ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[DFLT_PAGE_SIZE],
- DFLT_PAGE_SIZE));
+ for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) {
+ ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize],
+ pageSize));
}
// WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java
index 300b752..3a99236 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java
@@ -55,7 +55,6 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT
dbCfg.setWalMode(walMode());
dbCfg.setWalSegmentSize(512 * 1024);
dbCfg.setCheckpointFrequency(60 * 1000);//too high value for turn off frequency checkpoint.
- dbCfg.setPageSize(4 * 1024);
dbCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(100 * 1024 * 1024)
.setPersistenceEnabled(true));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
index 9a23502..14013b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
@@ -73,8 +73,6 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest {
if (isLargePage())
dbCfg.setPageSize(16 * 1024);
- else
- dbCfg.setPageSize(4 * 1024);
dbCfg.setWalMode(WALMode.LOG_ONLY);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 8ac72f7..02857ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
+import org.apache.ignite.internal.processors.cache.CacheCompressionManager;
import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
@@ -90,6 +91,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
true,
true,
false,
+ new CacheCompressionManager(),
new GridCacheEventManager(),
new CacheOsStoreManager(null, new CacheConfiguration()),
new GridCacheEvictionManager(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/pom.xml
----------------------------------------------------------------------
diff --git a/modules/direct-io/pom.xml b/modules/direct-io/pom.xml
index e460e67..a5e2841 100644
--- a/modules/direct-io/pom.xml
+++ b/modules/direct-io/pom.xml
@@ -59,6 +59,21 @@
</dependency>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-compress</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-compress</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
@@ -75,7 +90,7 @@
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
- <version>4.5.0</version>
+ <version>${jna.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
index 98fd99b..a37dcdb 100644
--- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.processors.compress.FileSystemUtils;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
@@ -46,19 +47,22 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
/** Negative value for file offset: read/write starting from current file position */
private static final int FILE_POS_USE_CURRENT = -1;
- /** File system & linux block size. Minimal amount of data can be written using DirectIO. */
- private final int fsBlockSize;
+ /** Minimal amount of data can be written using DirectIO. */
+ private final int ioBlockSize;
- /** Durable memory Page size. Can have greater value than {@link #fsBlockSize}. */
+ /** Durable memory Page size. Can have greater value than {@link #ioBlockSize}. */
private final int pageSize;
+ /** File system block size. */
+ private final int fsBlockSize;
+
/** File. */
private final File file;
/** Logger. */
private final IgniteLogger log;
- /** Thread local with buffers with capacity = one page {@link #pageSize} and aligned using {@link #fsBlockSize}. */
+ /** Thread local with buffers with capacity = one page {@link #pageSize} and aligned using {@link #ioBlockSize}. */
private ThreadLocal<ByteBuffer> tlbOnePageAligned;
/** Managed aligned buffers. Used to check if buffer is applicable for direct IO our data should be copied. */
@@ -79,18 +83,18 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
/**
* Creates Direct File IO.
*
- * @param fsBlockSize FS/OS block size.
+ * @param ioBlockSize FS/OS block size.
* @param pageSize Durable memory Page size.
* @param file File to open.
* @param modes Open options (flags).
* @param tlbOnePageAligned Thread local with buffers with capacity = one page {@code pageSize} and aligned using
- * {@code fsBlockSize}.
+ * {@code ioBlockSize}.
* @param managedAlignedBuffers Managed aligned buffers map, used to check if buffer is known.
* @param log Logger.
* @throws IOException if file open failed.
*/
AlignedBuffersDirectFileIO(
- int fsBlockSize,
+ int ioBlockSize,
int pageSize,
File file,
OpenOption[] modes,
@@ -99,7 +103,7 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
IgniteLogger log)
throws IOException {
this.log = log;
- this.fsBlockSize = fsBlockSize;
+ this.ioBlockSize = ioBlockSize;
this.pageSize = pageSize;
this.file = file;
this.tlbOnePageAligned = tlbOnePageAligned;
@@ -126,6 +130,7 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
"(probably incompatible file system selected, for example, tmpfs): " + msg);
this.fd = fd;
+ fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd);
return;
}
@@ -135,6 +140,7 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
}
this.fd = fd;
+ fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd);
}
/**
@@ -174,6 +180,21 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
}
/** {@inheritDoc} */
+ @Override public int getFileSystemBlockSize() {
+ return fsBlockSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSparseSize() {
+ return FileSystemUtils.getSparseFileSize(fd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int punchHole(long position, int len) {
+ return (int)FileSystemUtils.punchHole(fd, position, len, fsBlockSize);
+ }
+
+ /** {@inheritDoc} */
@Override public long position() throws IOException {
long position = IgniteNativeIoLib.lseek(fdCheckOpened(), 0, IgniteNativeIoLib.SEEK_CUR);
@@ -202,11 +223,22 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
@Override public int read(ByteBuffer destBuf, long filePosition) throws IOException {
int size = checkSizeIsPadded(destBuf.remaining());
- if (isKnownAligned(destBuf))
- return readIntoAlignedBuffer(destBuf, filePosition);
+ return isKnownAligned(destBuf) ?
+ readIntoAlignedBuffer(destBuf, filePosition) :
+ readIntoUnalignedBuffer(destBuf, filePosition, size);
+ }
+ /**
+ * @param destBuf Destination aligned byte buffer.
+ * @param filePosition File position.
+ * @param size Buffer size to write, should be divisible by {@link #ioBlockSize}.
+ * @return Number of read bytes, possibly zero, or <tt>-1</tt> if the
+ * given position is greater than or equal to the file's current size.
+ * @throws IOException If failed.
+ */
+ private int readIntoUnalignedBuffer(ByteBuffer destBuf, long filePosition, int size) throws IOException {
boolean useTlb = size == pageSize;
- ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(fsBlockSize, size);
+ ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(ioBlockSize, size);
try {
assert alignedBuf.position() == 0: "Temporary aligned buffer is in incorrect state: position is set incorrectly";
@@ -241,29 +273,46 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
/** {@inheritDoc} */
@Override public int write(ByteBuffer srcBuf, long filePosition) throws IOException {
- int size = checkSizeIsPadded(srcBuf.remaining());
+ return isKnownAligned(srcBuf) ?
+ writeFromAlignedBuffer(srcBuf, filePosition) :
+ writeFromUnalignedBuffer(srcBuf, filePosition);
+ }
- if (isKnownAligned(srcBuf))
- return writeFromAlignedBuffer(srcBuf, filePosition);
+ /**
+ * @param srcBuf buffer to check if it is known buffer.
+ * @param filePosition File position.
+ * @return Number of written bytes.
+ * @throws IOException If failed.
+ */
+ private int writeFromUnalignedBuffer(ByteBuffer srcBuf, long filePosition) throws IOException {
+ int size = srcBuf.remaining();
- boolean useTlb = size == pageSize;
- ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(fsBlockSize, size);
+ boolean useTlb = size <= pageSize;
+ ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(ioBlockSize, size);
try {
assert alignedBuf.position() == 0 : "Temporary aligned buffer is in incorrect state: position is set incorrectly";
- assert alignedBuf.limit() == size : "Temporary aligned buffer is in incorrect state: limit is set incorrectly";
+ assert alignedBuf.limit() >= size : "Temporary aligned buffer is in incorrect state: limit is set incorrectly";
int initPos = srcBuf.position();
alignedBuf.put(srcBuf);
alignedBuf.flip();
- srcBuf.position(initPos); // will update later from write results
+ int len = alignedBuf.remaining();
+
+ // Compressed buffer of wrong size can be passed here.
+ if (len % ioBlockSize != 0)
+ alignBufferLimit(alignedBuf);
int written = writeFromAlignedBuffer(alignedBuf, filePosition);
- if (written > 0)
- srcBuf.position(initPos + written);
+ // Actual written length can be greater than the original buffer,
+ // since we artificially expanded it to have correctly aligned size.
+ if (written > len)
+ written = len;
+
+ srcBuf.position(initPos + written);
return written;
}
@@ -276,6 +325,17 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
}
/**
+ * @param buf Byte buffer to align.
+ */
+ private void alignBufferLimit(ByteBuffer buf) {
+ int len = buf.remaining();
+
+ int alignedLen = (len / ioBlockSize + 1) * ioBlockSize;
+
+ buf.limit(buf.limit() + alignedLen - len);
+ }
+
+ /**
* Checks if we can run fast path: we got well known buffer is already aligned.
*
* @param srcBuf buffer to check if it is known buffer.
@@ -290,16 +350,16 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
/**
* Check if size is appropriate for aligned/direct IO.
*
- * @param size buffer size to write, should be divisible by {@link #fsBlockSize}.
+ * @param size buffer size to write, should be divisible by {@link #ioBlockSize}.
* @return size from parameter.
* @throws IOException if provided size can't be written using direct IO.
*/
private int checkSizeIsPadded(int size) throws IOException {
- if (size % fsBlockSize != 0) {
+ if (size % ioBlockSize != 0) {
throw new IOException(
- String.format("Unable to apply DirectIO for read/write buffer [%d] bytes on file system " +
+ String.format("Unable to apply DirectIO for read/write buffer [%d] bytes on " +
"block size [%d]. Consider setting %s.setPageSize(%d) or disable Direct IO.",
- size, fsBlockSize, DataStorageConfiguration.class.getSimpleName(), fsBlockSize));
+ size, ioBlockSize, DataStorageConfiguration.class.getSimpleName(), ioBlockSize));
}
return size;
@@ -446,9 +506,9 @@ public class AlignedBuffersDirectFileIO extends AbstractFileIO {
if (pos > buf.capacity())
throw new BufferOverflowException();
- if ((alignedPointer + pos) % fsBlockSize != 0) {
+ if ((alignedPointer + pos) % ioBlockSize != 0) {
U.warn(log, String.format("IO Buffer Pointer [%d] and/or offset [%d] seems to be not aligned " +
- "for FS block size [%d]. Direct IO may fail.", alignedPointer, buf.position(), fsBlockSize));
+ "for IO block size [%d]. Direct IO may fail.", alignedPointer, buf.position(), ioBlockSize));
}
return new Pointer(alignedPointer + pos);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
index 85a3a02..5d919fe 100644
--- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java
@@ -49,7 +49,7 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory {
private final FileIOFactory backupFactory;
/** File system/os block size, negative value if library init was failed. */
- private final int fsBlockSize;
+ private final int ioBlockSize;
/** Use backup factory, {@code true} if direct IO setup failed. */
private boolean useBackupFactory;
@@ -81,22 +81,22 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory {
this.backupFactory = backupFactory;
useBackupFactory = true;
- fsBlockSize = IgniteNativeIoLib.getFsBlockSize(storePath.getAbsolutePath(), log);
+ ioBlockSize = IgniteNativeIoLib.getDirectIOBlockSize(storePath.getAbsolutePath(), log);
if(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIRECT_IO_ENABLED, true)) {
if (log.isInfoEnabled())
- log.info("Direct IO is explicitly disabled by system property");
+ log.info("Direct IO is explicitly disabled by system property.");
return;
}
- if (fsBlockSize > 0) {
- int blkSize = fsBlockSize;
+ if (ioBlockSize > 0) {
+ int blkSize = ioBlockSize;
if (pageSize % blkSize != 0) {
U.warn(log, String.format("Unable to setup Direct IO for Ignite [pageSize=%d bytes;" +
" file system block size=%d]. For speeding up Ignite consider setting %s.setPageSize(%d)." +
- " Direct IO is disabled",
+ " Direct IO is disabled.",
pageSize, blkSize, DataStorageConfiguration.class.getSimpleName(), blkSize));
}
else {
@@ -133,7 +133,7 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory {
assert !useBackupFactory : "Direct IO is disabled, aligned managed buffer creation is disabled now";
assert managedAlignedBuffers != null : "Direct buffers not available";
- ByteBuffer allocate = AlignedBuffers.allocate(fsBlockSize, size).order(ByteOrder.nativeOrder());
+ ByteBuffer allocate = AlignedBuffers.allocate(ioBlockSize, size).order(ByteOrder.nativeOrder());
managedAlignedBuffers.put(GridUnsafe.bufferAddress(allocate), Thread.currentThread());
@@ -145,7 +145,7 @@ public class AlignedBuffersDirectFileIOFactory implements FileIOFactory {
if (useBackupFactory)
return backupFactory.create(file, modes);
- return new AlignedBuffersDirectFileIO(fsBlockSize, pageSize, file, modes, tlbOnePageAligned, managedAlignedBuffers, log);
+ return new AlignedBuffersDirectFileIO(ioBlockSize, pageSize, file, modes, tlbOnePageAligned, managedAlignedBuffers, log);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
index 65ef8d7..2ab4325 100644
--- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
+++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java
@@ -190,7 +190,7 @@ public class IgniteNativeIoLib {
* <li>and <tt>-1</tt> if failed to determine block size.</li>
* <li>and <tt>-1</tt> if JNA is not available or init failed.</li> </ul>
*/
- public static int getFsBlockSize(final String storageDir, final IgniteLogger log) {
+ public static int getDirectIOBlockSize(final String storageDir, final IgniteLogger log) {
if (ex != null) {
U.warn(log, "Failed to initialize O_DIRECT support at current OS: " + ex.getMessage(), ex);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java
----------------------------------------------------------------------
diff --git a/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java
new file mode 100644
index 0000000..89b963a
--- /dev/null
+++ b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DiskPageCompressionIntegrationDirectIOTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.internal.processors.cache.persistence.file.AlignedBuffersDirectFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationTest;
+
+/**
+ */
+public class DiskPageCompressionIntegrationDirectIOTest extends DiskPageCompressionIntegrationTest {
+ /** {@inheritDoc} */
+ @Override protected void checkFileIOFactory(FileIOFactory f) {
+ assertTrue(f instanceof AlignedBuffersDirectFileIOFactory);
+ }
+}