You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/03/31 12:24:44 UTC
[ignite-3] branch main updated: IGNITE-16280 Basic B+Tree-based partition storage support (#697)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 99b544f IGNITE-16280 Basic B+Tree-based partition storage support (#697)
99b544f is described below
commit 99b544f22e41abe60cdcad292e557af152aee3ce
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Mar 31 15:24:39 2022 +0300
IGNITE-16280 Basic B+Tree-based partition storage support (#697)
---
.../internal/pagememory/freelist/PagesList.java | 2 +-
.../internal/pagememory/tree/IgniteTree.java | 6 +-
.../ignite/internal/pagememory/util/PageUtils.java | 35 ++
.../internal/pagememory/util/PageUtilsTest.java | 115 +++++
.../apache/ignite/internal/storage/DataRow.java | 4 -
.../ignite/internal/storage/PartitionStorage.java | 13 +-
.../apache/ignite/internal/storage/SearchRow.java | 4 -
.../ignite/internal/storage/StorageUtils.java | 63 +++
.../storage/AbstractPartitionStorageTest.java | 11 +-
modules/storage-page-memory/pom.xml | 110 +++++
.../storage/pagememory/FragmentedByteArray.java | 94 ++++
.../storage/pagememory/PageMemoryDataRegion.java | 70 +++
.../pagememory/PageMemoryPartitionStorage.java | 512 +++++++++++++++++++++
.../pagememory/PageMemoryStorageEngine.java | 67 +++
.../pagememory/PageMemoryStorageIoModule.java | 43 ++
.../storage/pagememory/PageMemoryTableStorage.java | 188 ++++++++
.../internal/storage/pagememory/TableDataRow.java | 111 +++++
.../storage/pagememory/TableDataRowAdapter.java | 63 +++
.../internal/storage/pagememory/TableFreeList.java | 107 +++++
.../storage/pagememory/TableSearchRow.java} | 47 +-
.../internal/storage/pagememory/TableTree.java | 340 ++++++++++++++
.../pagememory/VolatilePageMemoryDataRegion.java | 102 ++++
.../pagememory/VolatilePageMemoryTableStorage.java | 42 ++
.../internal/storage/pagememory/io/RowIo.java} | 23 +-
.../storage/pagememory/io/TableDataIo.java | 174 +++++++
.../storage/pagememory/io/TableInnerIo.java | 113 +++++
.../storage/pagememory/io/TableLeafIo.java | 112 +++++
.../storage/pagememory/io/TableMetaIo.java} | 30 +-
...ache.ignite.internal.pagememory.io.PageIoModule | 17 +
.../pagememory/PageMemoryPartitionStorageTest.java | 156 +++++++
.../storage/rocksdb/RocksDbPartitionStorage.java | 14 +-
parent/pom.xml | 6 +
pom.xml | 1 +
33 files changed, 2724 insertions(+), 71 deletions(-)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
index da529d8..e6b706b 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
@@ -1930,7 +1930,7 @@ public abstract class PagesList extends DataStructure {
private int emptyFlushCnt;
/** Global (per data region) limit of caches for page lists. */
- private final AtomicLong pagesCacheLimit;
+ private final @Nullable AtomicLong pagesCacheLimit;
/**
* Constructor.
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
index 9f54f85..ecb20e5 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
@@ -123,11 +123,9 @@ public interface IgniteTree<L, T> {
@Nullable T newRow();
/**
- * Returns operation type for this closure or {@code null} if it is unknown.
- *
- * <p>NOTE: After method {@link #call} has been called, operation type must be known and this method can not return {@code null}.
+ * Returns operation type for this closure.
*/
- @Nullable OperationType operationType();
+ OperationType operationType();
}
/**
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageUtils.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageUtils.java
index a5878f4..928e150 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageUtils.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageUtils.java
@@ -279,4 +279,39 @@ public class PageUtils {
public static void copyMemory(long srcAddr, long srcOff, long dstAddr, long dstOff, long cnt) {
GridUnsafe.copyMemory(null, srcAddr + srcOff, null, dstAddr + dstOff, cnt);
}
+
+ /**
+ * Writes a {@link ByteBuffer} into the memory.
+ *
+ * <p>NOTE: Will be written from {@link ByteBuffer#position()} to {@link ByteBuffer#limit()}.
+ *
+ * @param addr Address.
+ * @param off Offset.
+ * @param buf Byte buffer.
+ */
+ public static void putByteBuffer(long addr, int off, ByteBuffer buf) {
+ assert addr > 0 : addr;
+ assert off >= 0 : off;
+ assert buf != null;
+
+ Object srcBase;
+
+ long srcOff;
+
+ if (buf.isDirect()) {
+ srcBase = null;
+
+ srcOff = GridUnsafe.bufferAddress(buf);
+ } else {
+ assert !buf.isReadOnly();
+
+ byte[] arr = buf.array();
+
+ srcBase = arr;
+
+ srcOff = GridUnsafe.BYTE_ARR_OFF + buf.arrayOffset();
+ }
+
+ GridUnsafe.copyMemory(srcBase, srcOff + buf.position(), null, addr + off, buf.limit() - buf.position());
+ }
}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/util/PageUtilsTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/util/PageUtilsTest.java
new file mode 100644
index 0000000..c3509e6
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/util/PageUtilsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.util;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getBytes;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putByteBuffer;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.LongConsumer;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * For {@link PageUtils} testing.
+ */
+public class PageUtilsTest {
+ @ParameterizedTest(name = "position = {0}")
+ @ValueSource(ints = {0, 128})
+ void testPutByteBufferIndirect(int position) {
+ byte[] bytes = randomBytes(1024);
+
+ ByteBuffer buf = ByteBuffer.wrap(bytes).position(position);
+
+ assertFalse(buf.isDirect());
+
+ allocateNativeMemory(2048, addr -> {
+ putByteBuffer(addr, 256, buf);
+
+ assertArrayEquals(
+ Arrays.copyOfRange(bytes, position, bytes.length),
+ getBytes(addr, 256, bytes.length - position)
+ );
+ });
+ }
+
+ @ParameterizedTest(name = "newPosition = {0}")
+ @ValueSource(ints = {0, 128})
+ void testPutByteBufferIndirectSlice(int newPosition) {
+ byte[] bytes = randomBytes(1024);
+
+ int pos = 256;
+ int len = 512;
+
+ ByteBuffer buf = ByteBuffer.wrap(bytes).position(pos).limit(pos + len).slice().position(newPosition);
+
+ assertFalse(buf.isDirect());
+
+ allocateNativeMemory(2048, addr -> {
+ putByteBuffer(addr, 256, buf);
+
+ assertArrayEquals(
+ Arrays.copyOfRange(bytes, pos + newPosition, pos + len),
+ getBytes(addr, 256, len - newPosition)
+ );
+ });
+ }
+
+ @ParameterizedTest(name = "position = {0}")
+ @ValueSource(ints = {0, 128})
+ void testPutByteBufferDirect(int position) {
+ final byte[] bytes = randomBytes(1024);
+
+ ByteBuffer buf = ByteBuffer.allocateDirect(1024).put(bytes).position(position);
+
+ assertTrue(buf.isDirect());
+
+ allocateNativeMemory(2048, addr -> {
+ putByteBuffer(addr, 256, buf);
+
+ assertArrayEquals(
+ Arrays.copyOfRange(bytes, position, bytes.length),
+ getBytes(addr, 256, bytes.length - position)
+ );
+ });
+ }
+
+ private byte[] randomBytes(int len) {
+ byte[] bytes = new byte[len];
+
+ ThreadLocalRandom.current().nextBytes(bytes);
+
+ return bytes;
+ }
+
+ private void allocateNativeMemory(int size, LongConsumer adderConsumer) {
+ long addr = GridUnsafe.allocateMemory(size);
+
+ try {
+ adderConsumer.accept(addr);
+ } finally {
+ GridUnsafe.freeMemory(addr);
+ }
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
index f63eedc..6411428 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
@@ -25,15 +25,11 @@ import java.nio.ByteBuffer;
public interface DataRow extends SearchRow {
/**
* Returns value bytes.
- *
- * @return Value bytes.
*/
byte[] valueBytes();
/**
* Returns value object as a byte buffer. Allows more effective memory management in certain cases.
- *
- * @return Value object as a byte buffer. Allows more effective memory management in certain cases.
*/
ByteBuffer value();
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
index 81f5eec..6f0feb9 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionStorage.java
@@ -54,7 +54,7 @@ public interface PartitionStorage extends AutoCloseable {
* @return Data rows.
* @throws StorageException If failed to read the data or the storage is already stopped.
*/
- Collection<DataRow> readAll(List<? extends SearchRow> keys);
+ Collection<DataRow> readAll(List<? extends SearchRow> keys) throws StorageException;
/**
* Writes a DataRow into the storage.
@@ -97,7 +97,7 @@ public interface PartitionStorage extends AutoCloseable {
* @return List of skipped data rows.
* @throws StorageException If failed to remove the data or the storage is already stopped.
*/
- Collection<SearchRow> removeAll(List<? extends SearchRow> keys);
+ Collection<SearchRow> removeAll(List<? extends SearchRow> keys) throws StorageException;
/**
* Removes {@link DataRow}s mapped by given keys and containing given values.
@@ -106,7 +106,7 @@ public interface PartitionStorage extends AutoCloseable {
* @return List of skipped data rows.
* @throws StorageException If failed to remove the data or the storage is already stopped.
*/
- Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues);
+ Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues) throws StorageException;
/**
* Executes an update with custom logic implemented by storage.UpdateClosure interface.
@@ -116,8 +116,7 @@ public interface PartitionStorage extends AutoCloseable {
* @param <T> Closure invocation's result type.
* @throws StorageException If failed to read data or storage is already stopped.
*/
- @Nullable
- <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException;
+ @Nullable <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException;
/**
* Creates cursor over the storage data.
@@ -145,6 +144,8 @@ public interface PartitionStorage extends AutoCloseable {
/**
* Removes all data from this storage and frees all associated resources.
+ *
+ * @throws StorageException If failed to destroy the data or storage is already stopped.
*/
- void destroy();
+ void destroy() throws StorageException;
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
index e1d6ead..d5df325 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
@@ -25,16 +25,12 @@ import java.nio.ByteBuffer;
public interface SearchRow {
/**
* Returns key bytes.
- *
- * @return Key bytes.
*/
byte[] keyBytes();
/**
* Returns key object as a byte buffer. Allows more effective memory management in certain cases. Position of the resulting buffer
* must be {@code 0}.
- *
- * @return Key object as a byte buffer. Allows more effective memory management in certain cases.
*/
ByteBuffer key();
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageUtils.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageUtils.java
new file mode 100644
index 0000000..f460e49
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.configuration.schemas.table.TableView;
+
+/**
+ * Utility class for storages.
+ */
+public class StorageUtils {
+ /**
+ * Returns byte buffer hash that matches corresponding array hash.
+ *
+ * @param buf Byte buffer.
+ */
+ public static int hashCode(ByteBuffer buf) {
+ int result = 1;
+ for (int i = buf.position(); i < buf.limit(); i++) {
+ result = 31 * result + buf.get(i);
+ }
+ return result;
+ }
+
+ /**
+ * Returns the group ID.
+ *
+ * @param tableView Table configuration view.
+ */
+ // TODO: IGNITE-16665 Move the group ID to the configuration.
+ public static int groupId(TableView tableView) {
+ return tableView.name().hashCode();
+ }
+
+ /**
+ * Converts to an array of bytes.
+ *
+ * @param buf Byte buffer.
+ */
+ // TODO: IGNITE-16350 Get rid of copying byte arrays.
+ public static byte[] toByteArray(ByteBuffer buf) {
+ byte[] arr = new byte[buf.limit()];
+
+ buf.get(arr);
+
+ return arr;
+ }
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
index 674b3ce..db339bf 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractPartitionStorageTest.java
@@ -467,7 +467,12 @@ public abstract class AbstractPartitionStorageTest {
public void testReadAll() {
List<DataRow> rows = insertBulk(100);
- assertThat(storage.readAll(rows), containsInAnyOrder(rows.toArray()));
+ Collection<DataRow> readRows = storage.readAll(rows);
+
+ assertThat(
+ readRows.stream().map(DataRow::value).collect(Collectors.toList()),
+ containsInAnyOrder(rows.stream().map(DataRow::value).toArray(ByteBuffer[]::new))
+ );
}
/**
@@ -661,7 +666,7 @@ public abstract class AbstractPartitionStorageTest {
* @param key String key.
* @return Search row.
*/
- private static SearchRow searchRow(String key) {
+ protected static SearchRow searchRow(String key) {
return new SearchRow() {
@Override
public byte[] keyBytes() {
@@ -682,7 +687,7 @@ public abstract class AbstractPartitionStorageTest {
* @param value String value.
* @return Data row.
*/
- private static DataRow dataRow(String key, String value) {
+ protected static DataRow dataRow(String key, String value) {
return new SimpleDataRow(
key.getBytes(StandardCharsets.UTF_8),
value.getBytes(StandardCharsets.UTF_8)
diff --git a/modules/storage-page-memory/pom.xml b/modules/storage-page-memory/pom.xml
new file mode 100644
index 0000000..9fd45cc
--- /dev/null
+++ b/modules/storage-page-memory/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-storage-page-memory</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-page-memory</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-schema</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-schema</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/FragmentedByteArray.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/FragmentedByteArray.java
new file mode 100644
index 0000000..bee429b
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/FragmentedByteArray.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Helper class for reading an array of bytes in fragments.
+ *
+ * <p>Structure: array length(int) + byte array(array length).
+ */
+class FragmentedByteArray {
+ private int arrLen = -1;
+
+ private byte[] arr = null;
+
+ private int off;
+
+ /**
+ * Reads data from the buffer.
+ *
+ * @param buf Byte buffer from which to read.
+ */
+ void readData(ByteBuffer buf) {
+ if (buf.remaining() == 0) {
+ return;
+ }
+
+ if (arrLen == -1) {
+ if (buf.remaining() >= 4) {
+ arrLen = buf.getInt();
+ } else {
+ if (arr == null) {
+ arr = new byte[4];
+ }
+
+ int len = Math.min(buf.remaining(), 4 - off);
+
+ buf.get(arr, off, len);
+ off += len;
+
+ if (off == 4) {
+ ByteBuffer tmpBuf = ByteBuffer.wrap(arr);
+
+ tmpBuf.order(buf.order());
+
+ arrLen = tmpBuf.getInt();
+ arr = null;
+ off = 0;
+ }
+ }
+ }
+
+ if (arrLen != -1) {
+ if (arr == null) {
+ arr = new byte[arrLen];
+ }
+
+ int len = Math.min(buf.remaining(), arrLen - off);
+
+ buf.get(arr, off, len);
+ off += len;
+ }
+ }
+
+ /**
+ * Returns true if the array has been read completely.
+ */
+ boolean ready() {
+ return arrLen != -1 && off == arrLen;
+ }
+
+ /**
+ * Returns byte array.
+ */
+ byte[] array() {
+ return arr;
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
new file mode 100644
index 0000000..c8e5784
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract data region for {@link PageMemoryStorageEngine}. Based on a {@link PageMemory}.
+ */
+// TODO: IGNITE-16641 Add support for persistent case.
+abstract class PageMemoryDataRegion implements DataRegion {
+ protected final PageMemoryDataRegionConfiguration cfg;
+
+ protected final PageIoRegistry ioRegistry;
+
+ protected PageMemory pageMemory;
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Data region configuration.
+ * @param ioRegistry IO registry.
+ */
+ public PageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, PageIoRegistry ioRegistry) {
+ this.cfg = cfg;
+ this.ioRegistry = ioRegistry;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() {
+ if (pageMemory != null) {
+ pageMemory.stop(true);
+ }
+ }
+
+ /**
+ * Returns {@link true} if the date region is persistent.
+ */
+ public boolean persistent() {
+ return ((PageMemoryDataRegionView) cfg.value()).persistent();
+ }
+
+ /**
+ * Returns page memory, {@code null} if not {@link #start started}.
+ */
+ public @Nullable PageMemory pageMemory() {
+ return pageMemory;
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
new file mode 100644
index 0000000..b0ae4e6
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.storage.StorageUtils.groupId;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage implementation based on a {@link BplusTree}.
+ */
+// TODO: IGNITE-16644 Support snapshots.
+class PageMemoryPartitionStorage implements PartitionStorage {
+ private final int partId;
+
+ private final TableTree tree;
+
+ private final TableFreeList freeList;
+
+ /**
+ * Constructor.
+ *
+ * @param partId Partition id.
+ * @param tableCfg – Table configuration.
+ * @param dataRegion – Data region for the table.
+ * @param freeList Table free list.
+ * @throws StorageException If there is an error while creating the partition storage.
+ */
+ public PageMemoryPartitionStorage(
+ int partId,
+ TableConfiguration tableCfg,
+ PageMemoryDataRegion dataRegion,
+ TableFreeList freeList
+ ) throws StorageException {
+ assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
+
+ this.partId = partId;
+
+ this.freeList = freeList;
+
+ TableView tableView = tableCfg.value();
+
+ int grpId = groupId(tableView);
+
+ try {
+ // TODO: IGNITE-16641 It is necessary to do getting the tree root for the persistent case.
+ long metaPageId = dataRegion.pageMemory().allocatePage(grpId, partId, FLAG_AUX);
+
+ // TODO: IGNITE-16641 It is necessary to take into account the persistent case.
+ boolean initNew = true;
+
+ tree = new TableTree(
+ grpId,
+ tableView.name(),
+ dataRegion.pageMemory(),
+ PageLockListenerNoOp.INSTANCE,
+ new AtomicLong(),
+ metaPageId,
+ freeList,
+ partId,
+ initNew
+ );
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error occurred while creating the partition storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int partitionId() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable DataRow read(SearchRow key) throws StorageException {
+ try {
+ return wrap(tree.findOne(wrap(key)));
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error reading row", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<DataRow> readAll(List<? extends SearchRow> keys) throws StorageException {
+ Collection<DataRow> res = new ArrayList<>(keys.size());
+
+ try {
+ for (SearchRow key : keys) {
+ res.add(wrap(tree.findOne(wrap(key))));
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error reading rows", e);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(DataRow row) throws StorageException {
+ try {
+ TableDataRow dataRow = wrap(row);
+
+ freeList.insertDataRow(dataRow);
+
+ tree.put(dataRow);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error writing row", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeAll(List<? extends DataRow> rows) throws StorageException {
+ try {
+ for (DataRow row : rows) {
+ TableDataRow dataRow = wrap(row);
+
+ freeList.insertDataRow(dataRow);
+
+ tree.put(dataRow);
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error writing rows", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws StorageException {
+ Collection<DataRow> cantInsert = new ArrayList<>();
+
+ try {
+ InsertClosure insertClosure = new InsertClosure(freeList);
+
+ for (DataRow row : rows) {
+ TableDataRow dataRow = wrap(row);
+
+ insertClosure.reset();
+
+ insertClosure.newRow = dataRow;
+
+ tree.invoke(dataRow, null, insertClosure);
+
+ if (insertClosure.oldRow != null) {
+ cantInsert.add(row);
+ }
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error inserting rows", e);
+ }
+
+ return cantInsert;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void remove(SearchRow key) throws StorageException {
+ try {
+ TableSearchRow searchRow = wrap(key);
+
+ TableDataRow removed = tree.remove(searchRow);
+
+ if (removed != null) {
+ freeList.removeDataRowByLink(removed.link());
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error removing row", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) throws StorageException {
+ Collection<SearchRow> skippedRows = new ArrayList<>();
+
+ try {
+ for (SearchRow key : keys) {
+ TableDataRow removed = tree.remove(wrap(key));
+
+ if (removed != null) {
+ freeList.removeDataRowByLink(removed.link());
+ } else {
+ skippedRows.add(key);
+ }
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error removing rows", e);
+ }
+
+ return skippedRows;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues) throws StorageException {
+ Collection<DataRow> skipped = new ArrayList<>();
+
+ try {
+ RemoveExactClosure removeExactClosure = new RemoveExactClosure();
+
+ for (DataRow keyValue : keyValues) {
+ TableDataRow dataRow = wrap(keyValue);
+
+ removeExactClosure.reset();
+
+ removeExactClosure.forRemoveRow = dataRow;
+
+ tree.invoke(dataRow, null, removeExactClosure);
+
+ if (removeExactClosure.foundRow == null) {
+ skipped.add(keyValue);
+ } else {
+ freeList.removeDataRowByLink(removeExactClosure.foundRow.link());
+ }
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error while removing exact rows", e);
+ }
+
+ return skipped;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> @Nullable T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
+ IgniteTree.InvokeClosure<TableDataRow> treeClosure = new IgniteTree.InvokeClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public void call(@Nullable TableDataRow oldRow) {
+ clo.call(wrap(oldRow));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable TableDataRow newRow() {
+ DataRow newRow = clo.newRow();
+
+ if (newRow == null) {
+ return null;
+ }
+
+ TableDataRow dataRow = wrap(newRow);
+
+ try {
+ freeList.insertDataRow(dataRow);
+ } catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ return dataRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteTree.OperationType operationType() {
+ OperationType operationType = clo.operationType();
+
+ switch (operationType) {
+ case WRITE:
+ return IgniteTree.OperationType.PUT;
+
+ case REMOVE:
+ return IgniteTree.OperationType.REMOVE;
+
+ case NOOP:
+ return IgniteTree.OperationType.NOOP;
+
+ default:
+ throw new UnsupportedOperationException(String.valueOf(clo.operationType()));
+ }
+ }
+ };
+
+ try {
+ tree.invoke(wrap(key), null, treeClosure);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error invoking a closure for a row", e);
+ }
+
+ return clo.result();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+ try {
+ IgniteCursor<TableDataRow> treeCursor = tree.find(null, null);
+
+ return new Cursor<DataRow>() {
+ @Nullable TableDataRow cur = advance();
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Iterator<DataRow> iterator() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ return cur != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DataRow next() {
+ DataRow next = wrap(cur);
+
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+
+ try {
+ cur = advance();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error getting next row", e);
+ }
+
+ return next;
+ }
+
+ @Nullable TableDataRow advance() throws IgniteInternalCheckedException {
+ while (treeCursor.next()) {
+ TableDataRow dataRow = treeCursor.get();
+
+ if (filter.test(wrap(dataRow))) {
+ return dataRow;
+ }
+ }
+
+ return null;
+ }
+ };
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error while scanning rows", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> snapshot(Path snapshotPath) {
+ throw new UnsupportedOperationException("Snapshots are not supported yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void restoreSnapshot(Path snapshotPath) {
+ throw new UnsupportedOperationException("Snapshots are not supported yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void destroy() throws StorageException {
+ try {
+ tree.destroy();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error while destroying data", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() {
+ tree.close();
+ }
+
+ private static TableSearchRow wrap(SearchRow searchRow) {
+ ByteBuffer key = searchRow.key();
+
+ return new TableSearchRow(StorageUtils.hashCode(key), key);
+ }
+
+ private static TableDataRow wrap(DataRow dataRow) {
+ ByteBuffer key = dataRow.key();
+ ByteBuffer value = dataRow.value();
+
+ return new TableDataRow(StorageUtils.hashCode(key), key, value);
+ }
+
+ private static @Nullable DataRow wrap(TableDataRow tableDataRow) {
+ return tableDataRow == null ? null : new TableDataRowAdapter(tableDataRow);
+ }
+
+ private static class InsertClosure implements IgniteTree.InvokeClosure<TableDataRow> {
+ final TableFreeList freeList;
+
+ TableDataRow newRow;
+
+ @Nullable TableDataRow oldRow;
+
+ InsertClosure(TableFreeList freeList) {
+ this.freeList = freeList;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void call(@Nullable TableDataRow oldRow) {
+ this.oldRow = oldRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable TableDataRow newRow() {
+ assert newRow != null;
+
+ try {
+ freeList.insertDataRow(newRow);
+ } catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ return newRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteTree.OperationType operationType() {
+ return oldRow == null ? IgniteTree.OperationType.PUT : IgniteTree.OperationType.NOOP;
+ }
+
+ void reset() {
+ newRow = null;
+
+ oldRow = null;
+ }
+ }
+
+ private static class RemoveExactClosure implements IgniteTree.InvokeClosure<TableDataRow> {
+ TableDataRow forRemoveRow;
+
+ @Nullable TableDataRow foundRow;
+
+ /** {@inheritDoc} */
+ @Override
+ public void call(@Nullable TableDataRow oldRow) {
+ assert forRemoveRow != null;
+
+ if (oldRow != null && oldRow.value().equals(forRemoveRow.value())) {
+ foundRow = oldRow;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable TableDataRow newRow() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteTree.OperationType operationType() {
+ return foundRow == null ? IgniteTree.OperationType.NOOP : IgniteTree.OperationType.REMOVE;
+ }
+
+ void reset() {
+ forRemoveRow = null;
+
+ foundRow = null;
+ }
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
new file mode 100644
index 0000000..96fae4b
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageEngine.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.nio.file.Path;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+
+/**
+ * Storage engine implementation based on {@link PageMemory}.
+ */
+public class PageMemoryStorageEngine implements StorageEngine {
+ private final PageIoRegistry ioRegistry;
+
+ /**
+ * Constructor.
+ *
+ * @param ioRegistry IO registry.
+ */
+ public PageMemoryStorageEngine(PageIoRegistry ioRegistry) {
+ this.ioRegistry = ioRegistry;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws StorageException {
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DataRegion createDataRegion(DataRegionConfiguration regionCfg) {
+ return new VolatilePageMemoryDataRegion((PageMemoryDataRegionConfiguration) regionCfg, ioRegistry);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableStorage createTable(Path tablePath, TableConfiguration tableCfg, DataRegion dataRegion) {
+ return new VolatilePageMemoryTableStorage(tableCfg, (VolatilePageMemoryDataRegion) dataRegion);
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageIoModule.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageIoModule.java
new file mode 100644
index 0000000..bc6ddd8
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageIoModule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.io.PageIoModule;
+import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableInnerIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableLeafIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableMetaIo;
+
+/**
+ * {@link PageIoModule} implementation in storage-page-memory module.
+ */
+public class PageMemoryStorageIoModule implements PageIoModule {
+ /** {@inheritDoc} */
+ @Override
+ public Collection<IoVersions<?>> ioVersions() {
+ return List.of(
+ TableMetaIo.VERSIONS,
+ TableInnerIo.VERSIONS,
+ TableLeafIo.VERSIONS,
+ TableDataIo.VERSIONS
+ );
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
new file mode 100644
index 0000000..aca18a0
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract table storage implementation based on {@link PageMemory}.
+ */
+// TODO: IGNITE-16641 Add support for persistent case.
+// TODO: IGNITE-16642 Support indexes.
+abstract class PageMemoryTableStorage implements TableStorage {
+ protected final PageMemoryDataRegion dataRegion;
+
+ protected final TableConfiguration tableCfg;
+
+ /** List of objects to be closed on the {@link #stop}. */
+ protected final List<AutoCloseable> autoCloseables = new CopyOnWriteArrayList<>();
+
+ protected volatile boolean started;
+
+ protected volatile AtomicReferenceArray<PartitionStorage> partitions;
+
+ /**
+ * Constructor.
+ *
+ * @param tableCfg – Table configuration.
+ * @param dataRegion – Data region for the table.
+ */
+ public PageMemoryTableStorage(TableConfiguration tableCfg, PageMemoryDataRegion dataRegion) {
+ this.dataRegion = dataRegion;
+ this.tableCfg = tableCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableConfiguration configuration() {
+ return tableCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DataRegion dataRegion() {
+ return dataRegion;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() throws StorageException {
+ TableView tableView = tableCfg.value();
+
+ partitions = new AtomicReferenceArray<>(tableView.partitions());
+
+ started = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws StorageException {
+ started = false;
+
+ List<AutoCloseable> autoCloseables = new ArrayList<>(this.autoCloseables);
+
+ for (int i = 0; i < partitions.length(); i++) {
+ PartitionStorage partition = partitions.getAndUpdate(i, p -> null);
+
+ if (partition != null) {
+ autoCloseables.add(partition);
+ }
+
+ autoCloseables.add(partition);
+ }
+
+ Collections.reverse(autoCloseables);
+
+ try {
+ IgniteUtils.closeAll(autoCloseables);
+ } catch (Exception e) {
+ throw new StorageException("Failed to stop PageMemory table storage.", e);
+ }
+
+ this.autoCloseables.clear();
+ partitions = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void destroy() throws StorageException {
+ stop();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PartitionStorage getOrCreatePartition(int partId) throws StorageException {
+ PartitionStorage partition = getPartition(partId);
+
+ if (partition != null) {
+ return partition;
+ }
+
+ partition = createPartitionStorage(partId);
+
+ partitions.set(partId, partition);
+
+ return partition;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable PartitionStorage getPartition(int partId) {
+ assert started : "Storage has not started yet";
+
+ if (partId < 0 || partId >= partitions.length()) {
+ throw new IllegalArgumentException(S.toString(
+ "Unable to access partition with id outside of configured range",
+ "table", tableCfg.name().value(), false,
+ "partitionId", partId, false,
+ "partitions", partitions.length(), false
+ ));
+ }
+
+ return partitions.get(partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void dropPartition(int partId) throws StorageException {
+ assert started : "Storage has not started yet";
+
+ PartitionStorage partition = getPartition(partId);
+
+ if (partition != null) {
+ partitions.set(partId, null);
+
+ partition.destroy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SortedIndexStorage getOrCreateSortedIndex(String indexName) {
+ throw new UnsupportedOperationException("Indexes are not supported yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void dropIndex(String indexName) {
+ throw new UnsupportedOperationException("Indexes are not supported yet.");
+ }
+
+ /**
+ * Returns a new instance of {@link PageMemoryPartitionStorage}.
+ *
+ * @param partId Partition id.
+ * @throws StorageException If there is an error while creating the partition storage.
+ */
+ protected abstract PageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException;
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRow.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRow.java
new file mode 100644
index 0000000..18b6dc6
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRow.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
+
+/**
+ * {@link DataRow} implementation.
+ */
+public class TableDataRow extends TableSearchRow implements Storable {
+ private long link;
+
+ private final ByteBuffer value;
+
+ /**
+ * Constructor.
+ *
+ * @param link Row link.
+ * @param hash Row hash.
+ * @param key Key byte buffer.
+ * @param value Value byte buffer.
+ */
+ public TableDataRow(long link, int hash, ByteBuffer key, ByteBuffer value) {
+ super(hash, key);
+
+ assert !value.isReadOnly();
+ assert value.position() == 0;
+
+ this.link = link;
+
+ this.value = value;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param hash Row hash.
+ * @param key Key byte buffer.
+ * @param value Value byte buffer.
+ */
+ public TableDataRow(int hash, ByteBuffer key, ByteBuffer value) {
+ this(0, hash, key, value);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void link(long link) {
+ this.link = link;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long link() {
+ return link;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int partition() {
+ return partitionId(pageId(link));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int size() {
+ return 4 + key.limit() + 4 + value.limit();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int headerSize() {
+ // Key size (int).
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IoVersions<? extends AbstractDataPageIo> ioVersions() {
+ return TableDataIo.VERSIONS;
+ }
+
+ /**
+ * Returns value object as a byte buffer.
+ */
+ public ByteBuffer value() {
+ return value.rewind();
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRowAdapter.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRowAdapter.java
new file mode 100644
index 0000000..3fba62e
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRowAdapter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.storage.StorageUtils.toByteArray;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * Delegating implementation of {@link DataRow}.
+ */
+class TableDataRowAdapter implements DataRow {
+ private final TableDataRow tableDataRow;
+
+ /**
+ * Constructor.
+ *
+ * @param tableDataRow Table data row.
+ */
+ TableDataRowAdapter(TableDataRow tableDataRow) {
+ this.tableDataRow = tableDataRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] valueBytes() {
+ return toByteArray(value());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ByteBuffer value() {
+ return tableDataRow.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] keyBytes() {
+ return toByteArray(key());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ByteBuffer key() {
+ return tableDataRow.key();
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableFreeList.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableFreeList.java
new file mode 100644
index 0000000..98dcdad
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableFreeList.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker;
+import org.apache.ignite.internal.pagememory.freelist.AbstractFreeList;
+import org.apache.ignite.internal.pagememory.freelist.FreeList;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link FreeList} implementation for storage-page-memory module.
+ */
+public class TableFreeList extends AbstractFreeList<TableDataRow> {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(TableFreeList.class);
+
+ private final IoStatisticsHolder statHolder;
+
+ /**
+ * Constructor.
+ *
+ * @param grpId Group ID.
+ * @param pageMem Page memory.
+ * @param lockLsnr Page lock listener.
+ * @param metaPageId Metadata page ID.
+ * @param initNew {@code True} if new metadata should be initialized.
+ * @param pageListCacheLimit Page list cache limit.
+ * @param evictionTracker Page eviction tracker.
+ * @param statHolder Statistics holder to track IO operations.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public TableFreeList(
+ int grpId,
+ PageMemory pageMem,
+ PageLockListener lockLsnr,
+ long metaPageId,
+ boolean initNew,
+ @Nullable AtomicLong pageListCacheLimit,
+ PageEvictionTracker evictionTracker,
+ IoStatisticsHolder statHolder
+ ) throws IgniteInternalCheckedException {
+ super(
+ grpId,
+ "TableFreeList_" + grpId,
+ pageMem,
+ null,
+ lockLsnr,
+ FLAG_AUX,
+ LOG,
+ metaPageId,
+ initNew,
+ pageListCacheLimit,
+ evictionTracker
+ );
+
+ this.statHolder = statHolder;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+ return pageMem.allocatePage(grpId, INDEX_PARTITION, defaultPageFlag);
+ }
+
+ /**
+ * Inserts a row.
+ *
+ * @param row Row.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void insertDataRow(TableDataRow row) throws IgniteInternalCheckedException {
+ super.insertDataRow(row, statHolder);
+ }
+
+ /**
+ * Removes a row by link.
+ *
+ * @param link Row link.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void removeDataRowByLink(long link) throws IgniteInternalCheckedException {
+ super.removeDataRowByLink(link, statHolder);
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableSearchRow.java
similarity index 52%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableSearchRow.java
index e1d6ead..17bc257 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableSearchRow.java
@@ -15,26 +15,51 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage;
+package org.apache.ignite.internal.storage.pagememory;
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.storage.SearchRow;
/**
- * Interface to be used as a key representation to search data in storage.
+ * {@link SearchRow} implementation.
*/
-public interface SearchRow {
+public class TableSearchRow {
+ protected final int hash;
+
+ protected final ByteBuffer key;
+
/**
- * Returns key bytes.
+ * Constructor.
*
- * @return Key bytes.
+ * @param hash Key hash.
+ * @param key Key byte buffer.
*/
- byte[] keyBytes();
+ public TableSearchRow(int hash, ByteBuffer key) {
+ assert !key.isReadOnly();
+ assert key.position() == 0;
+
+ this.hash = hash;
+ this.key = key;
+ }
/**
- * Returns key object as a byte buffer. Allows more effective memory management in certain cases. Position of the resulting buffer
- * must be {@code 0}.
- *
- * @return Key object as a byte buffer. Allows more effective memory management in certain cases.
+ * Returns key object as a byte buffer.
+ */
+ public ByteBuffer key() {
+ return key.rewind();
+ }
+
+ /**
+ * Returns hash of row.
+ */
+ public int hash() {
+ return hash;
+ }
+
+ /**
+ * Returns a row link.
*/
- ByteBuffer key();
+ public long link() {
+ return 0;
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
new file mode 100644
index 0000000..11303ca
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getBytes;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.FULL;
+import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.io.RowIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableInnerIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableLeafIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableMetaIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link BplusTree} implementation for storage-page-memory module.
+ */
+public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
+ private final int partId;
+
+ /**
+ * Constructor.
+ *
+ * @param grpId Group ID.
+ * @param grpName Group name.
+ * @param pageMem Page memory.
+ * @param lockLsnr Page lock listener.
+ * @param globalRmvId Global remove ID.
+ * @param metaPageId Meta page ID.
+ * @param reuseList Reuse list.
+ * @param partId Partition id.
+ * @param initNew {@code True} if new tree should be created.
+ */
+ public TableTree(
+ int grpId,
+ String grpName,
+ PageMemory pageMem,
+ PageLockListener lockLsnr,
+ AtomicLong globalRmvId,
+ long metaPageId,
+ @Nullable ReuseList reuseList,
+ int partId,
+ boolean initNew
+ ) throws IgniteInternalCheckedException {
+ super(
+ "TableTree_" + grpId,
+ grpId,
+ grpName,
+ pageMem,
+ lockLsnr,
+ FLAG_AUX,
+ globalRmvId,
+ metaPageId,
+ reuseList
+ );
+
+ this.partId = partId;
+
+ setIos(TableInnerIo.VERSIONS, TableLeafIo.VERSIONS, TableMetaIo.VERSIONS);
+
+ initTree(initNew);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected long allocatePageNoReuse() throws IgniteInternalCheckedException {
+ return pageMem.allocatePage(grpId, partId, defaultPageFlag);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected int compare(BplusIo<TableSearchRow> io, long pageAddr, int idx, TableSearchRow row) throws IgniteInternalCheckedException {
+ RowIo rowIo = (RowIo) io;
+
+ int cmp = Integer.compare(rowIo.hash(pageAddr, idx), row.hash());
+
+ return cmp != 0 ? cmp : compareRows(rowIo.link(pageAddr, idx), row);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableDataRow getRow(BplusIo<TableSearchRow> io, long pageAddr, int idx, Object x) throws IgniteInternalCheckedException {
+ RowIo rowIo = (RowIo) io;
+
+ int hash = rowIo.hash(pageAddr, idx);
+ long link = rowIo.link(pageAddr, idx);
+
+ return getRowByLink(link, hash, FULL);
+ }
+
+ /**
+ * Returns a row by link.
+ *
+ * @param link Row link.
+ * @param hash Row hash.
+ * @param rowData Specifies what data to lookup.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public TableDataRow getRowByLink(final long link, int hash, RowData rowData) throws IgniteInternalCheckedException {
+ assert link != 0;
+
+ FragmentedByteArray keyBytes = null;
+ FragmentedByteArray valueBytes = null;
+
+ long nextLink = link;
+
+ do {
+ final long pageId = pageId(nextLink);
+
+ final long page = pageMem.acquirePage(grpId, pageId, statisticsHolder());
+
+ try {
+ long pageAddr = pageMem.readLock(grpId, pageId, page);
+
+ assert pageAddr != 0L : nextLink;
+
+ try {
+ TableDataIo dataIo = pageMem.ioRegistry().resolve(pageAddr);
+
+ int itemId = itemId(nextLink);
+
+ int pageSize = pageMem.realPageSize(grpId);
+
+ DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
+
+ if (data.nextLink() == 0 && nextLink == link) {
+ // Good luck: we can read the row without fragments.
+ return readFullRow(link, hash, rowData, pageAddr + data.offset());
+ }
+
+ ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
+
+ dataBuf.position(data.offset());
+ dataBuf.limit(data.offset() + data.payloadSize());
+
+ if (keyBytes == null) {
+ keyBytes = new FragmentedByteArray();
+ }
+
+ keyBytes.readData(dataBuf);
+
+ if (keyBytes.ready()) {
+ if (rowData == KEY_ONLY) {
+ nextLink = 0;
+ continue;
+ }
+
+ if (valueBytes == null) {
+ valueBytes = new FragmentedByteArray();
+ }
+
+ valueBytes.readData(dataBuf);
+
+ if (valueBytes.ready()) {
+ nextLink = 0;
+ continue;
+ }
+ }
+
+ nextLink = data.nextLink();
+ } finally {
+ pageMem.readUnlock(grpId, pageId, page);
+ }
+ } finally {
+ pageMem.releasePage(grpId, pageId, page);
+ }
+ } while (nextLink != 0);
+
+ ByteBuffer key = ByteBuffer.wrap(keyBytes.array());
+ ByteBuffer value = ByteBuffer.wrap(valueBytes == null ? BYTE_EMPTY_ARRAY : valueBytes.array());
+
+ return new TableDataRow(link, hash, key, value);
+ }
+
+ private TableDataRow readFullRow(long link, int hash, RowData rowData, long pageAddr) {
+ int off = 0;
+
+ int keyBytesLen = getInt(pageAddr, off);
+ off += 4;
+
+ byte[] keyBytes = getBytes(pageAddr, off, keyBytesLen);
+ off += keyBytesLen;
+
+ if (rowData == KEY_ONLY) {
+ return new TableDataRow(link, hash, ByteBuffer.wrap(keyBytes), ByteBuffer.wrap(BYTE_EMPTY_ARRAY));
+ }
+
+ int valueBytesLen = getInt(pageAddr, off);
+ off += 4;
+
+ byte[] valueBytes = getBytes(pageAddr, off, valueBytesLen);
+
+ return new TableDataRow(link, hash, ByteBuffer.wrap(keyBytes), ByteBuffer.wrap(valueBytes));
+ }
+
+ private int compareRows(final long link, TableSearchRow row) throws IgniteInternalCheckedException {
+ assert link != 0;
+
+ long nextLink = link;
+
+ int keyBytesLen = -1;
+ int keyBytesOff = 0;
+
+ do {
+ final long pageId = pageId(nextLink);
+
+ final long page = pageMem.acquirePage(grpId, pageId, statisticsHolder());
+
+ try {
+ final long pageAddr = pageMem.readLock(grpId, pageId, page);
+
+ assert pageAddr != 0L : nextLink;
+
+ try {
+ TableDataIo dataIo = pageMem.ioRegistry().resolve(pageAddr);
+
+ int itemId = itemId(nextLink);
+
+ int pageSize = pageMem.realPageSize(grpId);
+
+ DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize);
+
+ if (data.nextLink() == 0 && nextLink == link) {
+ // Good luck: we can compare the rows without fragments.
+ return compareRowsFull(pageAddr + data.offset(), row);
+ }
+
+ ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
+
+ dataBuf.position(data.offset());
+ dataBuf.limit(data.offset() + data.payloadSize());
+
+ ByteBuffer keyBuf = row.key();
+
+ if (keyBytesLen == -1) {
+ // Guaranteed to read because we store it in the header.
+ keyBytesLen = dataBuf.getInt();
+
+ int cmp = Integer.compare(keyBytesLen, keyBuf.limit());
+
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ if (dataBuf.remaining() > 0) {
+ int len = Math.min(dataBuf.remaining(), keyBytesLen - keyBytesOff);
+
+ int dataBufPos = dataBuf.position();
+
+ dataBuf.position(dataBufPos);
+ dataBuf.limit(dataBufPos + len);
+
+ int oldKeyBufLimit = keyBuf.limit();
+
+ keyBuf.position(keyBytesOff);
+ keyBuf.limit(keyBytesOff + len);
+
+ int cmp = dataBuf.compareTo(keyBuf);
+
+ keyBytesOff += len;
+
+ keyBuf.limit(oldKeyBufLimit);
+
+ if (cmp != 0 || keyBytesOff == keyBytesLen) {
+ return cmp;
+ }
+ }
+
+ nextLink = data.nextLink();
+ } finally {
+ pageMem.readUnlock(grpId, pageId, page);
+ }
+ } finally {
+ pageMem.releasePage(grpId, pageId, page);
+ }
+ } while (nextLink != 0);
+
+ throw new IgniteInternalCheckedException("Row comparison error [link=" + link + ", row=" + row + "]");
+ }
+
+ private int compareRowsFull(final long pageAddr, TableSearchRow row) {
+ int off = 0;
+
+ int keyBytesLen = getInt(pageAddr, off);
+ off += 4;
+
+ ByteBuffer key = row.key();
+
+ int cmp = Integer.compare(keyBytesLen, key.limit());
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return wrapPointer(pageAddr + off, keyBytesLen).compareTo(key);
+ }
+
+ /**
+ * Row data.
+ */
+ public enum RowData {
+ /** Only {@link TableDataRow#key()} key}. */
+ KEY_ONLY,
+
+ /** All: {@link TableDataRow#key()} key} and {@link TableDataRow#value()} value}. */
+ FULL
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
new file mode 100644
index 0000000..1b81821
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.INDEX_PARTITION;
+
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfiguration;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
+import org.apache.ignite.internal.pagememory.impl.PageMemoryNoStoreImpl;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Implementation of {@link PageMemoryDataRegion} for in-memory case.
+ */
+class VolatilePageMemoryDataRegion extends PageMemoryDataRegion {
+ private TableFreeList freeList;
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Data region configuration.
+ * @param ioRegistry IO registry.
+ */
+ public VolatilePageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, PageIoRegistry ioRegistry) {
+ super(cfg, ioRegistry);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ assert !persistent() : cfg.value().name();
+
+ assert cfg.memoryAllocator() instanceof UnsafeMemoryAllocatorConfiguration : cfg.memoryAllocator();
+
+ PageMemory pageMemory = new PageMemoryNoStoreImpl(new UnsafeMemoryProvider(null), cfg, ioRegistry);
+
+ pageMemory.start();
+
+ this.pageMemory = pageMemory;
+
+ try {
+ int grpId = 0;
+
+ long metaPageId = pageMemory.allocatePage(grpId, INDEX_PARTITION, FLAG_AUX);
+
+ this.freeList = new TableFreeList(
+ grpId,
+ pageMemory,
+ PageLockListenerNoOp.INSTANCE,
+ metaPageId,
+ true,
+ null,
+ PageEvictionTrackerNoOp.INSTANCE,
+ IoStatisticsHolderNoOp.INSTANCE
+ );
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error creating a freeList", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() {
+ super.stop();
+
+ if (freeList != null) {
+ freeList.close();
+ }
+ }
+
+ /**
+ * Returns free list.
+ *
+ * <p>NOTE: Free list must be one for the in-memory data region.
+ */
+ public TableFreeList freeList() {
+ return freeList;
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
new file mode 100644
index 0000000..8eaf09d
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.storage.StorageException;
+
+/**
+ * Implementation of {@link PageMemoryTableStorage} for in-memory case.
+ */
+class VolatilePageMemoryTableStorage extends PageMemoryTableStorage {
+ /**
+ * Constructor.
+ *
+ * @param tableCfg – Table configuration.
+ * @param dataRegion – Data region for the table.
+ */
+ public VolatilePageMemoryTableStorage(TableConfiguration tableCfg, VolatilePageMemoryDataRegion dataRegion) {
+ super(tableCfg, dataRegion);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected PageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException {
+ return new PageMemoryPartitionStorage(partId, tableCfg, dataRegion, ((VolatilePageMemoryDataRegion) dataRegion).freeList());
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/RowIo.java
similarity index 61%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/RowIo.java
index e1d6ead..cffff78 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/RowIo.java
@@ -15,26 +15,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage;
-
-import java.nio.ByteBuffer;
+package org.apache.ignite.internal.storage.pagememory.io;
/**
- * Interface to be used as a key representation to search data in storage.
+ * Interface for row IO.
*/
-public interface SearchRow {
+public interface RowIo {
/**
- * Returns key bytes.
+ * Returns the link for the row in the page by index.
*
- * @return Key bytes.
+ * @param pageAddr Page address.
+ * @param idx Index.
*/
- byte[] keyBytes();
+ long link(long pageAddr, int idx);
/**
- * Returns key object as a byte buffer. Allows more effective memory management in certain cases. Position of the resulting buffer
- * must be {@code 0}.
+ * Returns the hash for the row in the page by index.
*
- * @return Key object as a byte buffer. Allows more effective memory management in certain cases.
+ * @param pageAddr Page address.
+ * @param idx Index.
*/
- ByteBuffer key();
+ int hash(long pageAddr, int idx);
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableDataIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableDataIo.java
new file mode 100644
index 0000000..c82cc77
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableDataIo.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putByteBuffer;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.storage.pagememory.TableDataRow;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
+import org.apache.ignite.lang.IgniteStringBuilder;
+
+/**
+ * Data pages IO for {@link TableTree}.
+ */
+public class TableDataIo extends AbstractDataPageIo<TableDataRow> {
+ /** Page IO type. */
+ public static final short T_TABLE_DATA_IO = 6;
+
+ /** I/O versions. */
+ public static final IoVersions<TableDataIo> VERSIONS = new IoVersions<>(new TableDataIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ protected TableDataIo(int ver) {
+ super(T_TABLE_DATA_IO, ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void writeRowData(long pageAddr, int dataOff, int payloadSize, TableDataRow row, boolean newRow) {
+ assertPageType(pageAddr);
+
+ long addr = pageAddr + dataOff;
+
+ if (newRow) {
+ putShort(addr, 0, (short) payloadSize);
+ addr += 2;
+
+ ByteBuffer key = row.key();
+
+ putInt(addr, 0, key.limit());
+ addr += 4;
+
+ putByteBuffer(addr, 0, key);
+ addr += key.limit();
+ } else {
+ addr += 2 + 4 + row.key().limit();
+ }
+
+ ByteBuffer value = row.value();
+
+ putInt(addr, 0, value.limit());
+ addr += 4;
+
+ putByteBuffer(addr, 0, value);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void writeFragmentData(TableDataRow row, ByteBuffer buf, int rowOff, int payloadSize) {
+ assertPageType(buf);
+
+ ByteBuffer key = row.key();
+
+ int written = writeFragmentByteBuffer(buf, rowOff, 0, payloadSize, key);
+
+ written += writeFragmentByteBuffer(buf, rowOff + written, 4 + key.limit(), payloadSize - written, row.value());
+
+ assert written == payloadSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
+ sb.app("TableDataIo [\n");
+ printPageLayout(addr, pageSize, sb);
+ sb.app("\n]");
+ }
+
+ private int writeFragmentByteBuffer(
+ ByteBuffer bufWriteTo,
+ int rowOff,
+ int expOff,
+ int payloadSize,
+ ByteBuffer bufReadFrom
+ ) {
+ if (payloadSize == 0) {
+ // No space left to write.
+ return 0;
+ }
+
+ if (rowOff >= expOff + 4 + bufReadFrom.limit()) {
+ // Already fully written to the buffer.
+ return 0;
+ }
+
+ int len = Math.min(payloadSize, expOff + 4 + bufReadFrom.limit() - rowOff);
+
+ putValue(bufWriteTo, rowOff - expOff, len, bufReadFrom);
+
+ return len;
+ }
+
+ private void putValue(
+ ByteBuffer bufWriteTo,
+ int off,
+ int len,
+ ByteBuffer bufReadFrom
+ ) {
+ if (off == 0 && len >= 4) {
+ bufWriteTo.putInt(bufReadFrom.limit());
+
+ len -= 4;
+ } else if (off >= 4) {
+ off -= 4;
+ } else {
+ // Partial length write.
+ ByteBuffer tmp = ByteBuffer.allocate(4);
+
+ tmp.order(bufWriteTo.order());
+
+ tmp.putInt(bufReadFrom.limit());
+
+ tmp.position(off);
+
+ if (len < tmp.capacity()) {
+ tmp.limit(off + Math.min(len, tmp.capacity() - off));
+ }
+
+ bufWriteTo.put(tmp);
+
+ if (tmp.limit() < 4) {
+ return;
+ }
+
+ len -= 4 - off;
+ off = 0;
+ }
+
+ int oldBufLimit = bufReadFrom.limit();
+
+ bufReadFrom.position(off);
+
+ if (len < bufReadFrom.capacity()) {
+ bufReadFrom.limit(off + Math.min(len, bufReadFrom.capacity() - off));
+ }
+
+ bufWriteTo.put(bufReadFrom);
+
+ bufReadFrom.limit(oldBufLimit);
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java
new file mode 100644
index 0000000..1ad48f0
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * IO routines for {@link TableTree} inner pages.
+ *
+ * <p>Structure: hash(int) + link(long).
+ */
+public class TableInnerIo extends BplusInnerIo<TableSearchRow> implements RowIo {
+ private static final int LINK_OFFSET = 4;
+
+ /** Page IO type. */
+ public static final short T_TABLE_INNER_IO = 4;
+
+ /** I/O versions. */
+ public static final IoVersions<TableInnerIo> VERSIONS = new IoVersions<>(new TableInnerIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ protected TableInnerIo(int ver) {
+ super(
+ T_TABLE_INNER_IO,
+ ver,
+ true,
+ Integer.BYTES + Long.BYTES // hash(int) + link(long);
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> srcIo, long srcPageAddr, int srcIdx) {
+ assertPageType(dstPageAddr);
+
+ int srcHash = hash(srcPageAddr, srcIdx);
+ long srcLink = link(srcPageAddr, srcIdx);
+
+ int dstOff = offset(dstIdx);
+
+ putInt(dstPageAddr, dstOff, srcHash);
+ dstOff += LINK_OFFSET;
+
+ putLong(dstPageAddr, dstOff, srcLink);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void storeByOffset(long pageAddr, int off, TableSearchRow row) {
+ assertPageType(pageAddr);
+
+ putInt(pageAddr, off, row.hash());
+ off += LINK_OFFSET;
+
+ putLong(pageAddr, off, row.link());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableSearchRow getLookupRow(BplusTree<TableSearchRow, ?> tree, long pageAddr, int idx) throws IgniteInternalCheckedException {
+ int hash = hash(pageAddr, idx);
+ long link = link(pageAddr, idx);
+
+ return ((TableTree) tree).getRowByLink(link, hash, KEY_ONLY);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long link(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return getLong(pageAddr, offset(idx) + LINK_OFFSET);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hash(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return getInt(pageAddr, offset(idx));
+ }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java
new file mode 100644
index 0000000..91e5237
--- /dev/null
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
+import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * IO routines for {@link TableTree} leaf pages.
+ *
+ * <p>Structure: hash(int) + link(long).
+ */
+public class TableLeafIo extends BplusLeafIo<TableSearchRow> implements RowIo {
+ private static final int LINK_OFFSET = 4;
+
+ /** Page IO type. */
+ public static final short T_TABLE_LEAF_IO = 5;
+
+ /** I/O versions. */
+ public static final IoVersions<TableLeafIo> VERSIONS = new IoVersions<>(new TableLeafIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ protected TableLeafIo(int ver) {
+ super(
+ T_TABLE_LEAF_IO,
+ ver,
+ Integer.BYTES + Long.BYTES // hash(int) + link(long);
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> srcIo, long srcPageAddr, int srcIdx) {
+ assertPageType(dstPageAddr);
+
+ int srcHash = hash(srcPageAddr, srcIdx);
+ long srcLink = link(srcPageAddr, srcIdx);
+
+ int dstOff = offset(dstIdx);
+
+ putInt(dstPageAddr, dstOff, srcHash);
+ dstOff += LINK_OFFSET;
+
+ putLong(dstPageAddr, dstOff, srcLink);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void storeByOffset(long pageAddr, int off, TableSearchRow row) {
+ assertPageType(pageAddr);
+
+ putInt(pageAddr, off, row.hash());
+ off += LINK_OFFSET;
+
+ putLong(pageAddr, off, row.link());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TableSearchRow getLookupRow(BplusTree<TableSearchRow, ?> tree, long pageAddr, int idx) throws IgniteInternalCheckedException {
+ int hash = hash(pageAddr, idx);
+ long link = link(pageAddr, idx);
+
+ return ((TableTree) tree).getRowByLink(link, hash, KEY_ONLY);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long link(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return getLong(pageAddr, offset(idx) + LINK_OFFSET);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hash(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return getInt(pageAddr, offset(idx));
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableMetaIo.java
similarity index 55%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
copy to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableMetaIo.java
index e1d6ead..00687a9 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableMetaIo.java
@@ -15,26 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage;
+package org.apache.ignite.internal.storage.pagememory.io;
-import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.io.BplusMetaIo;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
/**
- * Interface to be used as a key representation to search data in storage.
+ * IO routines for {@link TableTree} meta pages.
*/
-public interface SearchRow {
- /**
- * Returns key bytes.
- *
- * @return Key bytes.
- */
- byte[] keyBytes();
+public class TableMetaIo extends BplusMetaIo {
+ /** Page IO type. */
+ public static final short T_TABLE_META_IO = 3;
+
+ /** I/O versions. */
+ public static final IoVersions<TableMetaIo> VERSIONS = new IoVersions<>(new TableMetaIo(1));
/**
- * Returns key object as a byte buffer. Allows more effective memory management in certain cases. Position of the resulting buffer
- * must be {@code 0}.
+ * Constructor.
*
- * @return Key object as a byte buffer. Allows more effective memory management in certain cases.
+ * @param ver Page format version.
*/
- ByteBuffer key();
+ protected TableMetaIo(int ver) {
+ super(T_TABLE_META_IO, ver);
+ }
}
diff --git a/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
new file mode 100644
index 0000000..2622249
--- /dev/null
+++ b/modules/storage-page-memory/src/main/resources/META-INF/services/org.apache.ignite.internal.pagememory.io.PageIoModule
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.ignite.internal.storage.pagememory.PageMemoryStorageIoModule
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorageTest.java
new file mode 100644
index 0000000..df68202
--- /dev/null
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorageTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+import java.nio.file.Path;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Storage test implementation for {@link PageMemoryPartitionStorage}.
+ */
+// TODO: IGNITE-16641 Add test for persistent case.
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class PageMemoryPartitionStorageTest extends AbstractPartitionStorageTest {
+ private static PageIoRegistry ioRegistry;
+
+ @InjectConfiguration(
+ value = "mock.type = pagemem",
+ polymorphicExtensions = {
+ PageMemoryDataRegionConfigurationSchema.class,
+ UnsafeMemoryAllocatorConfigurationSchema.class
+ })
+ private DataRegionConfiguration dataRegionCfg;
+
+ @InjectConfiguration(
+ value = "mock.name = default",
+ polymorphicExtensions = HashIndexConfigurationSchema.class
+ )
+ private TableConfiguration tableCfg;
+
+ private StorageEngine engine;
+
+ private TableStorage table;
+
+ private DataRegion dataRegion;
+
+ @BeforeAll
+ static void beforeAll() {
+ ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+ }
+
+ @BeforeEach
+ void setUp() {
+ engine = new PageMemoryStorageEngine(ioRegistry);
+
+ engine.start();
+
+ dataRegion = engine.createDataRegion(fixConfiguration(dataRegionCfg));
+
+ assertThat(dataRegion, is(instanceOf(PageMemoryDataRegion.class)));
+
+ dataRegion.start();
+
+ table = engine.createTable(null, tableCfg, dataRegion);
+
+ assertThat(table, is(instanceOf(PageMemoryTableStorage.class)));
+
+ table.start();
+
+ storage = table.getOrCreatePartition(0);
+
+ assertThat(storage, is(instanceOf(PageMemoryPartitionStorage.class)));
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ storage,
+ table == null ? null : table::stop,
+ dataRegion == null ? null : dataRegion::stop,
+ engine == null ? null : engine::stop
+ );
+ }
+
+ @AfterAll
+ static void afterAll() {
+ ioRegistry = null;
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Override
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-16644")
+ public void testSnapshot(@WorkDirectory Path workDir) throws Exception {
+ super.testSnapshot(workDir);
+ }
+
+ /**
+ * Checks that fragments are written and read correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void testFragments() {
+ int pageSize = ((PageMemoryDataRegion) dataRegion).pageMemory().pageSize();
+
+ DataRow dataRow = dataRow(createRandomString(pageSize), createRandomString(pageSize));
+
+ storage.write(dataRow);
+
+ DataRow read = storage.read(dataRow);
+
+ assertArrayEquals(dataRow.valueBytes(), read.valueBytes());
+ }
+
+ private String createRandomString(int len) {
+ return ThreadLocalRandom.current().ints(len).mapToObj(i -> String.valueOf(Math.abs(i % 10))).collect(joining(""));
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
index d76bbde..017d868 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.SearchRow;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
import org.apache.ignite.internal.storage.basic.DelegatingDataRow;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.util.Cursor;
@@ -449,23 +450,12 @@ class RocksDbPartitionStorage implements PartitionStorage {
.order(ByteOrder.BIG_ENDIAN)
.putShort((short) partId)
// TODO: use precomputed hash, see https://issues.apache.org/jira/browse/IGNITE-16370
- .putInt(hashCode(keyBuffer))
+ .putInt(StorageUtils.hashCode(keyBuffer))
.put(keyBuffer)
.array();
}
/**
- * Returns byte buffer hash that matches corresponding array hash.
- */
- private static int hashCode(ByteBuffer buf) {
- int result = 1;
- for (int i = buf.position(); i < buf.limit(); i++) {
- result = 31 * result + buf.get(i);
- }
- return result;
- }
-
- /**
* Gets a list of key byte arrays.
*
* @param keyValues Key rows.
diff --git a/parent/pom.xml b/parent/pom.xml
index 911c381..84b9808 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -269,6 +269,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-page-memory</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-schema</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/pom.xml b/pom.xml
index 233be44..b57f243 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,7 @@
<module>modules/schema</module>
<module>modules/sql-engine</module>
<module>modules/storage-api</module>
+ <module>modules/storage-page-memory</module>
<module>modules/storage-rocksdb</module>
<module>modules/table</module>
<module>modules/transactions</module>