You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/28 11:57:26 UTC
[ignite-3] branch main updated: IGNITE-14745 Implemented Storage
API for partitions and 2 basic implementations - CHM and RocksDB. (#169)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f5efe45 IGNITE-14745 Implemented Storage API for partitions and 2 basic implementations - CHM and RocksDB. (#169)
f5efe45 is described below
commit f5efe45ed1063e6b9f6dd863ba882335a9cb4291
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Jun 28 14:57:19 2021 +0300
IGNITE-14745 Implemented Storage API for partitions and 2 basic implementations - CHM and RocksDB. (#169)
---
.../apache/ignite/internal/util/IgniteUtils.java | 37 ++-
.../ignite/internal/network/netty/NettyServer.java | 4 +-
modules/storage-api/pom.xml | 70 ++++++
.../apache/ignite/internal/storage/DataRow.java | 36 +++
.../ignite/internal/storage/InvokeClosure.java | 40 ++++
.../ignite/internal/storage/OperationType.java | 30 +++
.../apache/ignite/internal/storage/SearchRow.java | 36 +++
.../apache/ignite/internal/storage/Storage.java | 70 ++++++
.../ignite/internal/storage/StorageException.java | 45 ++++
.../storage/basic/ConcurrentHashMapStorage.java | 140 +++++++++++
.../internal/storage/basic/SimpleDataRow.java | 59 +++++
.../storage/basic/SimpleReadInvokeClosure.java | 57 +++++
.../storage/basic/SimpleRemoveInvokeClosure.java | 42 ++++
.../storage/basic/SimpleWriteInvokeClosure.java | 52 +++++
.../internal/storage/AbstractStorageTest.java | 205 +++++++++++++++++
.../basic/ConcurrentHashMapStorageTest.java | 31 +++
modules/storage-rocksdb/pom.xml | 67 ++++++
.../internal/storage/rocksdb/RocksDbStorage.java | 255 +++++++++++++++++++++
.../storage/rocksdb/RocksDbStorageTest.java | 53 +++++
parent/pom.xml | 21 +-
pom.xml | 2 +
21 files changed, 1347 insertions(+), 5 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 0b61103..78dbd7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -17,13 +17,18 @@
package org.apache.ignite.internal.util;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
-
import org.jetbrains.annotations.Nullable;
/**
@@ -355,6 +360,36 @@ public class IgniteUtils {
}
/**
+ * Deletes a file or a directory with all sub-directories and files.
+ *
+ * @param path File or directory to delete.
+ * @return {@code true} if and only if the file or directory is successfully deleted,
+ * {@code false} otherwise
+ */
+ public static boolean delete(Path path) {
+ try {
+ Files.walkFileTree(path, new SimpleFileVisitor<>() {
+ @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ return true;
+ }
+ catch (IOException e) {
+ return false;
+ }
+ }
+
+ /**
* @return {@code True} if assertions enabled.
*/
public static boolean assertionsEnabled() {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 5cc998a..ba961fa 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -33,8 +33,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
-import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
@@ -183,8 +183,6 @@ public class NettyServer {
*/
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
- .childOption(ChannelOption.SO_LINGER, 0)
- .childOption(ChannelOption.TCP_NODELAY, true)
/*
* When the keepalive option is set for a TCP socket and no data has been exchanged across the socket
* in either direction for 2 hours (NOTE: the actual value is implementation dependent),
diff --git a/modules/storage-api/pom.xml b/modules/storage-api/pom.xml
new file mode 100644
index 0000000..9f9f97e
--- /dev/null
+++ b/modules/storage-api/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-storage-api</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
new file mode 100644
index 0000000..4373f98
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interface that represents a data row from the storage - a key-value pair. Can be used as a {@link SearchRow}.
+ */
+public interface DataRow extends SearchRow {
+ /**
+ * @return Value bytes.
+ */
+ byte @Nullable [] valueBytes();
+
+ /**
+ * @return Value object as a byte buffer. Allows more effective memory management in certain cases.
+ */
+ ByteBuffer value();
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
new file mode 100644
index 0000000..7126069
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+ /**
+ * @param row Old row or {@code null} if the old row has not been found.
+ */
+ void call(@Nullable DataRow row);
+
+ /**
+ * @return New row for the {@link OperationType#WRITE} operation.
+ */
+ @Nullable DataRow newRow();
+
+ /**
+ * @return Operation type for this closure or {@code null} if it is unknown.
+ * After method {@link #call(DataRow)} has been called, operation type must
+ * be know and this method can not return {@code null}.
+ */
+ @Nullable OperationType operationType();
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java
new file mode 100644
index 0000000..58fe8a9
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/** Operation types for {@link InvokeClosure}. */
+public enum OperationType {
+ /** Noop, signifies read operation. */
+ NOOP,
+
+ /** Remove operation. */
+ REMOVE,
+
+ /** Write/insert operation. */
+ WRITE
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
new file mode 100644
index 0000000..74db8d9
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+ /**
+ * @return Key bytes.
+ */
+ byte @Nullable [] keyBytes();
+
+ /**
+ * @return Key object as a byte buffer. Allows more effective memory management in certain cases.
+ */
+ @Nullable ByteBuffer key();
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
new file mode 100644
index 0000000..9be5a27
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+ /**
+ * Reads a DataRow for a given key.
+ *
+ * @param key Search row.
+ * @return Data row.
+ * @throws StorageException If failed to read data or storage is already stopped.
+ */
+ public DataRow read(SearchRow key) throws StorageException;
+
+ /**
+ * Writes a DataRow to the storage.
+ *
+ * @param row Data row.
+ * @throws StorageException If failed to read data or storage is already stopped.
+ */
+ public void write(DataRow row) throws StorageException;
+
+ /**
+ * Removes a DataRow associated with a given Key.
+ *
+ * @param key Search row.
+ * @throws StorageException If failed to read data or storage is already stopped.
+ */
+ public void remove(SearchRow key) throws StorageException;
+
+ /**
+ * Executes an update with custom logic implemented by storage.UpdateClosure interface.
+ *
+ * @param key Search key.
+ * @param clo Invoke closure.
+ * @throws StorageException If failed to read data or storage is already stopped.
+ */
+ public void invoke(SearchRow key, InvokeClosure clo) throws StorageException;
+
+ /**
+ * Creates cursor over the storage data.
+ *
+ * @param filter Filter for the scan query.
+ * @return Cursor with filtered data.
+ * @throws StorageException If failed to read data or storage is already stopped.
+ */
+ public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException;
+}
+
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
new file mode 100644
index 0000000..92a0a13
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by the storage.
+ */
+public class StorageException extends RuntimeException {
+ /**
+ * @param message Error message.
+ */
+ public StorageException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param message Error message.
+ * @param cause The cause.
+ */
+ public StorageException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * @param cause The cause.
+ */
+ public StorageException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
new file mode 100644
index 0000000..e462653
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+ /** Storage content. */
+ private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+ /** RW lock. */
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ /** {@inheritDoc} */
+ @Override public DataRow read(SearchRow key) throws StorageException {
+ byte[] keyBytes = key.keyBytes();
+
+ byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+ return new SimpleDataRow(keyBytes, valueBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataRow row) throws StorageException {
+ rwLock.readLock().lock();
+
+ try {
+ map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(SearchRow key) throws StorageException {
+ rwLock.readLock().lock();
+
+ try {
+ map.remove(new ByteArray(key.keyBytes()));
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+ byte[] keyBytes = key.keyBytes();
+
+ ByteArray mapKey = new ByteArray(keyBytes);
+
+ rwLock.writeLock().lock();
+
+ try {
+ byte[] existingDataBytes = map.get(mapKey);
+
+ clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+ switch (clo.operationType()) {
+ case WRITE:
+ map.put(mapKey, clo.newRow().valueBytes());
+
+ break;
+
+ case REMOVE:
+ map.remove(mapKey);
+
+ break;
+
+ case NOOP:
+ break;
+ }
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+ Iterator<SimpleDataRow> iter = map.entrySet().stream()
+ .map(e -> new SimpleDataRow(e.getKey().bytes(), e.getValue()))
+ .filter(filter)
+ .iterator();
+
+ return new Cursor<>() {
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public DataRow next() {
+ return iter.next();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Iterator<DataRow> iterator() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ // No-op.
+ }
+ };
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
new file mode 100644
index 0000000..f90ea1d
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.storage.DataRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}.
+ */
+public class SimpleDataRow implements DataRow {
+ /** Key array. */
+ private final byte[] key;
+
+ /** Value array. */
+ private final byte @Nullable [] value;
+
+ public SimpleDataRow(byte[] key, byte @Nullable [] value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer key() {
+ return ByteBuffer.wrap(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] keyBytes() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public ByteBuffer value() {
+ return value == null ? null : ByteBuffer.wrap(value);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte @Nullable [] valueBytes() {
+ return value;
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
new file mode 100644
index 0000000..71f3b79
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for read operation. */
+public class SimpleReadInvokeClosure implements InvokeClosure {
+ /** Copy of the row that was passed to {@link #call(DataRow)} method. */
+ @Nullable
+ private DataRow row;
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable DataRow row) {
+ this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public DataRow newRow() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public OperationType operationType() {
+ return OperationType.NOOP;
+ }
+
+ /**
+ * Copy of the row that was passed to {@link #call(DataRow)} method.
+ *
+ * @return Copy of data row.
+ */
+ @Nullable
+ public DataRow row() {
+ return row;
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
new file mode 100644
index 0000000..abb3db8
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for a remove operation. */
+public class SimpleRemoveInvokeClosure implements InvokeClosure {
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable DataRow row) {
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public DataRow newRow() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public OperationType operationType() {
+ return OperationType.REMOVE;
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
new file mode 100644
index 0000000..4749511
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for a write operation. */
+public class SimpleWriteInvokeClosure implements InvokeClosure {
+ /** Data row to write into storage. */
+ private final DataRow newRow;
+
+ /**
+ * @param newRow Data row to write into the storage.
+ */
+ public SimpleWriteInvokeClosure(DataRow newRow) {
+ this.newRow = newRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable DataRow row) {
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public DataRow newRow() {
+ return newRow;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public OperationType operationType() {
+ return OperationType.WRITE;
+ }
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
new file mode 100644
index 0000000..f225b58
--- /dev/null
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+ /** Storage instance. */
+ protected Storage storage;
+
+ /**
+ * Wraps string key into a search row.
+ *
+ * @param key String key.
+ * @return Search row.
+ */
+ private SearchRow searchRow(String key) {
+ return new SimpleDataRow(
+ key.getBytes(StandardCharsets.UTF_8),
+ null
+ );
+ }
+
+ /**
+ * Wraps string key/value pair into a data row.
+ *
+ * @param key String key.
+ * @param value String value.
+ * @return Data row.
+ */
+ private DataRow dataRow(String key, String value) {
+ return new SimpleDataRow(
+ key.getBytes(StandardCharsets.UTF_8),
+ value.getBytes(StandardCharsets.UTF_8)
+ );
+ }
+
+ /**
+ * Tests that read / write / remove work consistently on the same key.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void readWriteRemove() throws Exception {
+ SearchRow searchRow = searchRow("key");
+
+ assertNull(storage.read(searchRow).value());
+
+ DataRow dataRow = dataRow("key", "value");
+
+ storage.write(dataRow);
+
+ assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array());
+
+ storage.remove(searchRow);
+
+ assertNull(storage.read(searchRow).value());
+ }
+
+ /**
+ * Tests that invoke method works consistently with default read / write / remove closures implementations on the
+ * same key.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void invoke() throws Exception {
+ SearchRow searchRow = searchRow("key");
+
+ SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
+
+ storage.invoke(searchRow, readClosure);
+
+ assertNull(readClosure.row().value());
+
+ DataRow dataRow = dataRow("key", "value");
+
+ storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
+
+ storage.invoke(searchRow, readClosure);
+
+ assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());
+
+ storage.invoke(searchRow, new SimpleRemoveInvokeClosure());
+
+ storage.invoke(searchRow, readClosure);
+
+ assertNull(readClosure.row().value());
+ }
+
+ /**
+ * Tests that scan operation works properly without filter.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void scanSimple() throws Exception {
+ List<DataRow> list = toList(storage.scan(key -> true));
+
+ assertEquals(emptyList(), list);
+
+ DataRow dataRow1 = dataRow("key1", "value1");
+
+ storage.write(dataRow1);
+
+ list = toList(storage.scan(key -> true));
+
+ assertThat(list, hasSize(1));
+
+ assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+ DataRow dataRow2 = dataRow("key2", "value2");
+
+ storage.write(dataRow2);
+
+ list = toList(storage.scan(key -> true));
+
+ assertThat(list, hasSize(2));
+
+ // "key1" and "key2" have the same order both by hash and lexicographically.
+ assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+ assertArrayEquals(dataRow2.value().array(), list.get(1).value().array());
+ }
+
+ /**
+ * Tests that scan operation works properly with passed filter.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void scanFiltered() throws Exception {
+ DataRow dataRow1 = dataRow("key1", "value1");
+ DataRow dataRow2 = dataRow("key2", "value2");
+
+ storage.write(dataRow1);
+ storage.write(dataRow2);
+
+ List<DataRow> list = toList(storage.scan(key -> key.keyBytes()[3] == '1'));
+
+ assertThat(list, hasSize(1));
+
+ assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+ list = toList(storage.scan(key -> key.keyBytes()[3] == '2'));
+
+ assertThat(list, hasSize(1));
+
+ assertArrayEquals(dataRow2.value().array(), list.get(0).value().array());
+
+ list = toList(storage.scan(key -> false));
+
+ assertTrue(list.isEmpty());
+ }
+
+ /**
+ * Converts cursor to list.
+ *
+ * @param cursor Cursor.
+ * @param <T> Type of cursor content.
+ * @return List.
+ * @throws Exception If error occurred during iteration or while closing the cursor.
+ */
+ @NotNull
+ private <T> List<T> toList(Cursor<T> cursor) throws Exception {
+ try (cursor) {
+ return StreamSupport.stream(cursor.spliterator(), false).collect(Collectors.toList());
+ }
+ }
+}
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
new file mode 100644
index 0000000..eaaed7e
--- /dev/null
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ */
+public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
+ @BeforeEach
+ public void setUp() {
+ storage = new ConcurrentHashMapStorage();
+ }
+}
diff --git a/modules/storage-rocksdb/pom.xml b/modules/storage-rocksdb/pom.xml
new file mode 100644
index 0000000..b7ab8d5
--- /dev/null
+++ b/modules/storage-rocksdb/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-storage-rocksdb</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies -->
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
new file mode 100644
index 0000000..e05cc76
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /** RocksDB comparator options. */
+ private final ComparatorOptions comparatorOptions;
+
+ /** RocksDB comparator. */
+ private final AbstractComparator comparator;
+
+ /** RockDB options. */
+ private final Options options;
+
+ /** RocksDb instance. */
+ private final RocksDB db;
+
+ /** RW lock. */
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ /**
+ * @param dbPath Path to the folder to store data.
+ * @param comparator Keys comparator.
+ * @throws StorageException If failed to create RocksDB instance.
+ */
+ public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+ try {
+ comparatorOptions = new ComparatorOptions();
+
+ this.comparator = new AbstractComparator(comparatorOptions) {
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "comparator";
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(ByteBuffer a, ByteBuffer b) {
+ return comparator.compare(a, b);
+ }
+ };
+
+ options = new Options();
+
+ options.setCreateIfMissing(true);
+
+ options.setComparator(this.comparator);
+
+ this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+ }
+ catch (RocksDBException e) {
+ close();
+
+ throw new StorageException("Failed to start the storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public DataRow read(SearchRow key) throws StorageException {
+ try {
+ byte[] keyBytes = key.keyBytes();
+
+ return new SimpleDataRow(keyBytes, db.get(keyBytes));
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to read data from the storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataRow row) throws StorageException {
+ rwLock.readLock().lock();
+
+ try {
+ db.put(row.keyBytes(), row.valueBytes());
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Filed to write data to the storage", e);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(SearchRow key) throws StorageException {
+ rwLock.readLock().lock();
+
+ try {
+ db.delete(key.keyBytes());
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to remove data from the storage", e);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+ rwLock.writeLock().lock();
+
+ try {
+ byte[] keyBytes = key.keyBytes();
+ byte[] existingDataBytes = db.get(keyBytes);
+
+ clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+ switch (clo.operationType()) {
+ case WRITE:
+ db.put(keyBytes, clo.newRow().valueBytes());
+
+ break;
+
+ case REMOVE:
+ db.delete(keyBytes);
+
+ break;
+
+ case NOOP:
+ break;
+ }
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to access data in the storage", e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+ return new ScanCursor(db.newIterator(), filter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ try (comparatorOptions; comparator; options) {
+ db.close();
+ }
+ }
+
+ /** Cusror wrapper over the RocksIterator object with custom filter. */
+ private static class ScanCursor implements Cursor<DataRow> {
+ /** Iterator from RocksDB. */
+ private final RocksIterator iter;
+
+ /** Custom filter predicate. */
+ private final Predicate<SearchRow> filter;
+
+ private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+ this.iter = iter;
+ this.filter = filter;
+
+ iter.seekToFirst();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Iterator<DataRow> iterator() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ while (isValid() && !filter.test(new SimpleDataRow(iter.key(), null)))
+ iter.next();
+
+ return isValid();
+ }
+
+ /**
+ * Checks iterator validity.
+ *
+ * @throws IgniteInternalException If iterator is not valid and {@link RocksIterator#status()} has thrown an
+ * exception.
+ */
+ private boolean isValid() {
+ if (iter.isValid())
+ return true;
+
+ try {
+ iter.status();
+
+ return false;
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public DataRow next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ var row = new SimpleDataRow(iter.key(), iter.value());
+
+ iter.next();
+
+ return row;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ iter.close();
+ }
+ }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
new file mode 100644
index 0000000..c1ac8f1
--- /dev/null
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Storage test implementation for {@link RocksDbStorage}.
+ */
+public class RocksDbStorageTest extends AbstractStorageTest {
+ private Path path;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ path = Paths.get("rocksdb_test");
+
+ IgniteUtils.delete(path);
+
+ storage = new RocksDbStorage(path, ByteBuffer::compareTo);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ try {
+ if (storage != null)
+ ((AutoCloseable)storage).close();
+ }
+ finally {
+ IgniteUtils.delete(path);
+ }
+ }
+}
diff --git a/parent/pom.xml b/parent/pom.xml
index eab9068..91ac3f4 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -80,7 +80,7 @@
<reflections.version>0.9.10</reflections.version>
<javassist.version>3.27.0-GA</javassist.version>
<esri.geometry.version>2.2.3</esri.geometry.version>
- <rocksdb.version>5.18.4</rocksdb.version>
+ <rocksdb.version>6.20.3</rocksdb.version>
<disruptor.version>3.3.7</disruptor.version>
<metrics.version>4.0.2</metrics.version>
<jctools.version>3.3.0</jctools.version>
@@ -244,6 +244,18 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-rocksdb</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-table</artifactId>
<version>${project.version}</version>
</dependency>
@@ -421,6 +433,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
diff --git a/pom.xml b/pom.xml
index 81852dc..102cf87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,8 @@
<module>modules/rest</module>
<module>modules/runner</module>
<module>modules/schema</module>
+ <module>modules/storage-api</module>
+ <module>modules/storage-rocksdb</module>
<module>modules/table</module>
<module>modules/vault</module>
</modules>