You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/10 09:54:52 UTC

[GitHub] [ignite-3] ibessonov opened a new pull request #169: IGNITE-14745 Storage API for partitions.

ibessonov opened a new pull request #169:
URL: https://github.com/apache/ignite-3/pull/169


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r659653430



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.

Review comment:
       ```suggestion
    * Exception thrown by the storage.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.storage.DataRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}.
+ */
+public class SimpleDataRow implements DataRow {
+    /** Key array. */
+    private final byte[] key;
+
+    /** Value array. */
+    @Nullable private final byte[] value;

Review comment:
       same here (and below) about Nullable and arrays

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for read operation. */
+public class SimpleReadInvokeClosure implements InvokeClosure {
+    /** Copy of the row that was passed to {@link #call(DataRow)} method. */
+    private DataRow row;

Review comment:
       this field should be marked as `@Nullable`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for read operation. */
+public class SimpleReadInvokeClosure implements InvokeClosure {
+    /** Copy of the row that was passed to {@link #call(DataRow)} method. */
+    private DataRow row;
+
+    /** {@inheritDoc} */
+    @Override public void call(@Nullable DataRow row) {
+        this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Nullable
+    @Override public DataRow newRow() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable
+    @Override public OperationType operationType() {
+        return OperationType.NOOP;
+    }
+
+    /** Copy of the row that was passed to {@link #call(DataRow)} method. */
+    public DataRow row() {

Review comment:
       and this method too

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+    /**
+     * @return Key bytes.
+     */
+    @Nullable byte[] keyBytes();

Review comment:
       unfortunately this is not the correct way to use the `@Nullable` annotation on arrays, should be `byte @Nullable []` instead

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for write operation. */

Review comment:
       ```suggestion
   /** Invoke closure implementation for a write operation. */
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for remove operation. */

Review comment:
       ```suggestion
   /** Invoke closure implementation for a remove operation. */
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+/** Invoke closure implementation for write operation. */
+public class SimpleWriteInvokeClosure implements InvokeClosure {
+    /** Data row to write into storage. */
+    private final DataRow newRow;
+
+    /**
+     * @param newRow Data row to write into storage.

Review comment:
       ```suggestion
        * @param newRow Data row to write into the storage.
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /** RW lock. */
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);

Review comment:
       ```suggestion
               throw new StorageException("Failed to start the storage", e);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656023649



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.

Review comment:
       ok, let's leave the current notation, though it hurts me to say that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655945041



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/** Operation types for {@link InvokeClosure}. */
+public enum OperationType {
+    /** Noop, signifies read operation. */
+    NOOP,

Review comment:
       It's more general. We're not sure that implementations will actually want to "read" values, they can just validate it and decide to do nothing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r654198812



##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
##########
@@ -179,8 +179,6 @@ public NettyServer(
                  */
                 .option(ChannelOption.SO_BACKLOG, 128)
                 .option(ChannelOption.SO_REUSEADDR, true)
-                .childOption(ChannelOption.SO_LINGER, 0)

Review comment:
       These are duplicates

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
##########
@@ -179,8 +179,6 @@ public NettyServer(
                  */
                 .option(ChannelOption.SO_BACKLOG, 128)
                 .option(ChannelOption.SO_REUSEADDR, true)
-                .childOption(ChannelOption.SO_LINGER, 0)

Review comment:
       These are duplicates, or are they? I'll recheck

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
##########
@@ -179,8 +179,6 @@ public NettyServer(
                  */
                 .option(ChannelOption.SO_BACKLOG, 128)
                 .option(ChannelOption.SO_REUSEADDR, true)
-                .childOption(ChannelOption.SO_LINGER, 0)

Review comment:
       These are duplicates, or are they? I'll recheck
   EDIT: yes they are

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.
+     */
+    void call(@Nullable DataRow row);
+
+    /**
+     * @return New row for {@link OperationType#WRITE} operation.
+     */
+    DataRow newRow();

Review comment:
       How else would you provide a new row?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655333991



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()

Review comment:
       Now I wonder why I didn't write it like this in the first place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r654352768



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {

Review comment:
       Can we use the following approach instead?
   ```
   try {
       Files.walkFileTree(path, new SimpleFileVisitor<>() {
           @Override
           public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
               Files.delete(dir);
               return FileVisitResult.CONTINUE;
           }
   
           @Override
           public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
               Files.delete(file);
               return FileVisitResult.CONTINUE;
           }
       });
   
       return true;
   }
   catch (IOException e) {
       return false;
   }
   ```

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {
+            try {
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+                    for (Path innerPath : stream) {
+                        boolean res = delete(innerPath);
+
+                        if (!res)
+                            return false;
+                    }
+                }
+            } catch (IOException e) {
+                return false;
+            }
+        }
+
+        if (path.toFile().getName().endsWith("jar")) {
+            try {
+                // Why do we do this?

Review comment:
       Indeed, why?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.

Review comment:
       ```suggestion
        * Deletes a file or a directory with all sub-directories and files.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}

Review comment:
       ```suggestion
    * Basic array-based implementation of the {@link DataRow}.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.

Review comment:
       I don't know if Ignite has particular rules, but Oracle Javadoc style declares that such descriptions should be written as `@param row data row` (regular letter, no period)

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.

Review comment:
       ```suggestion
        * Writes a DataRow to the storage.
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {
+        private final RocksIterator iter;
+
+        private final Predicate<SearchRow> filter;
+
+        private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+            this.iter = iter;
+            this.filter = filter;
+
+            iter.seekToFirst();
+
+            hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<DataRow> iterator() {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            while (iter.isValid() && !filter.test(new SimpleDataRow(iter.key(), null)))

Review comment:
       you should also call the `status` method in case `isValid` returns `false`, see https://github.com/facebook/rocksdb/wiki/Iterator#error-handling

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface that represents data row from the storage - a key-value pair. Can be used as a {@link SearchRow}.

Review comment:
       ```suggestion
    * Interface that represents a data row from the storage - a key-value pair. Can be used as a {@link SearchRow}.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+    /**
+     * @return Hash of the key.
+     */
+    int hash();

Review comment:
       What is this method for?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.

Review comment:
       ```suggestion
        * @param path file or directory to delete
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.
+ */
+public class StorageException extends Exception {

Review comment:
       Why does it have to be a checked exception?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/** Operation types for {@link InvokeClosure}. */
+public enum OperationType {
+    /** Noop, signifies read operation. */
+    NOOP,

Review comment:
       why not `READ`?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()

Review comment:
       Looks like this code can be simplified a little bit:
   ```
   Iterator<SimpleDataRow> iter = map.entrySet().stream()
       .map(e -> new SimpleDataRow(e.getKey().bytes(), e.getValue()))
       .filter(filter)
       .iterator();
   
   return new Cursor<>() {
       /** {@inheritDoc} */
       @Override public boolean hasNext() {
           return iter.hasNext();
       }
   
       /** {@inheritDoc} */
       @Override public DataRow next() {
           return iter.next();
       }
   
       /** {@inheritDoc} */
       @NotNull @Override public Iterator<DataRow> iterator() {
           return this;
       }
   
       /** {@inheritDoc} */
       @Override public void close() throws Exception {
           // No-op.
       }
   };
   ```
   
   What do you think?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.

Review comment:
       ```suggestion
        * @param row old row or {@code null} if the old row has not been found.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();

Review comment:
       do we need a `ConcurrentMap` here? Looks like a single ReadWriteLock would be enough...

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.
+     */
+    void call(@Nullable DataRow row);
+
+    /**
+     * @return New row for {@link OperationType#WRITE} operation.

Review comment:
       ```suggestion
        * @return new row for the {@link OperationType#WRITE} operation
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+public class SimpleRemoveInvokeClosure implements InvokeClosure {

Review comment:
       missing javadoc

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.
+     */
+    void call(@Nullable DataRow row);
+
+    /**
+     * @return New row for {@link OperationType#WRITE} operation.
+     */
+    DataRow newRow();
+
+    /**
+     * @return Operation type for this closure or {@code null} if it is unknown.
+     * After method {@link #call(DataRow)} has been called, operation type must
+     * be know and this method can not return {@code null}.
+     */
+    OperationType operationType();

Review comment:
       > or {@code null} if it is unknown
   
   This means that this method should be annotated with `@Nullable`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+public class SimpleWriteInvokeClosure implements InvokeClosure {

Review comment:
       missing javadoc

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ */
+public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
+    @BeforeEach
+    public void setUp() {
+        storage = new ConcurrentHashMapStorage();
+    }
+
+    @AfterEach
+    public void tearDown() {

Review comment:
       this method is redundant

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}
+ */
+public class SimpleDataRow implements DataRow {
+    /** Key array. */
+    private final byte[] key;
+
+    /** Value array. */
+    private final byte[] value;

Review comment:
       I think you should mark this field and all related stuff as `@Nullable`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()
+            .filter(entry -> filter.test(new SimpleDataRow(entry.getKey().bytes(), null)))
+            .iterator();
+
+        return new Cursor<DataRow>() {

Review comment:
       ```suggestion
           return new Cursor<>() {
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       Unfortunately this is not a bulletproof way of closing resources, since any of the `close` methods can thrown an exception

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void write(DataRow row) throws StorageException;
+
+    /**
+     * Removes DataRow associated with a given Key.

Review comment:
       ```suggestion
        * Removes a DataRow associated with a given Key.
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {
+        private final RocksIterator iter;
+
+        private final Predicate<SearchRow> filter;
+
+        private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+            this.iter = iter;
+            this.filter = filter;
+
+            iter.seekToFirst();
+
+            hasNext();

Review comment:
       why do you need to call this method here?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void write(DataRow row) throws StorageException;
+
+    /**
+     * Removes DataRow associated with a given Key.
+     *
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void remove(SearchRow key) throws StorageException;
+
+    /**
+     * Executes an update with custom logic implemented by storage.UpdateClosure interface.
+     *
+     * @param key Search key.
+     * @param clo Invoke closure.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void invoke(SearchRow key, InvokeClosure clo) throws StorageException;
+
+    /**
+     * Creates cursor over storage data.

Review comment:
       ```suggestion
        * Creates cursor over the storage data.
   ```

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        assertNull(storage.read(searchRow).value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.write(dataRow);
+
+        assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array());
+
+        storage.remove(searchRow);
+
+        assertNull(storage.read(searchRow).value());
+    }
+
+    @Test
+    public void invoke() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
+
+        storage.invoke(searchRow, readClosure);
+
+        Assertions.assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());

Review comment:
       ```suggestion
           assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());
   ```

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {

Review comment:
       please add javadocs for all tests

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {

Review comment:
       missing javadoc

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       Unfortunately this is not a bulletproof way of closing resources, since any of the `close` methods can throw an exception

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        assertNull(storage.read(searchRow).value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.write(dataRow);
+
+        assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array());
+
+        storage.remove(searchRow);
+
+        assertNull(storage.read(searchRow).value());
+    }
+
+    @Test
+    public void invoke() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
+
+        storage.invoke(searchRow, readClosure);
+
+        Assertions.assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());
+
+        storage.invoke(searchRow, new SimpleRemoveInvokeClosure());
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+    }
+
+    @Test
+    public void scanSimple() throws Exception {
+        List<DataRow> list = toList(storage.scan(key -> true));
+
+        assertEquals(emptyList(), list);
+
+        DataRow dataRow1 = dataRow("key1", "value1");
+
+        storage.write(dataRow1);
+
+        list = toList(storage.scan(key -> true));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+        DataRow dataRow2 = dataRow("key2", "value2");
+
+        storage.write(dataRow2);
+
+        list = toList(storage.scan(key -> true));
+
+        assertThat(list, hasSize(2));
+
+        // "key1" and "key2" have the same order both by hash and lexicographically.
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+        assertArrayEquals(dataRow2.value().array(), list.get(1).value().array());
+    }
+
+    @Test
+    public void scanFiltered() throws Exception {
+        DataRow dataRow1 = dataRow("key1", "value1");
+        DataRow dataRow2 = dataRow("key2", "value2");
+
+        storage.write(dataRow1);
+        storage.write(dataRow2);
+
+        List<DataRow> list = toList(storage.scan(key -> key.keyBytes()[3] == '1'));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+        list = toList(storage.scan(key -> key.keyBytes()[3] == '2'));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow2.value().array(), list.get(0).value().array());
+
+        list = toList(storage.scan(key -> false));
+
+        assertTrue(list.isEmpty());
+    }
+
+    /**
+     * Converts cursor to list.
+     *
+     * @param cursor Cursor.
+     * @param <T> Type of cursor content.
+     * @return List.
+     */
+    @NotNull private <T> List<T> toList(Cursor<T> cursor) {
+        return StreamSupport.stream(cursor.spliterator(), false).collect(Collectors.toList());

Review comment:
       you actually never close the passed in cursor, so this is a resource leak




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656026514



##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       AFAIK, it is not documented anywhere that they **never** throw exceptions, therefore you should always assume that they do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656023951



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {
+            try {
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+                    for (Path innerPath : stream) {
+                        boolean res = delete(innerPath);
+
+                        if (!res)
+                            return false;
+                    }
+                }
+            } catch (IOException e) {
+                return false;
+            }
+        }
+
+        if (path.toFile().getName().endsWith("jar")) {
+            try {
+                // Why do we do this?

Review comment:
       maybe we don't need this code then?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656122062



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {
+            try {
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+                    for (Path innerPath : stream) {
+                        boolean res = delete(innerPath);
+
+                        if (!res)
+                            return false;
+                    }
+                }
+            } catch (IOException e) {
+                return false;
+            }
+        }
+
+        if (path.toFile().getName().endsWith("jar")) {
+            try {
+                // Why do we do this?

Review comment:
       I replaced it with your implementation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656290411



##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       I must agree even if they in fact never throw anything, code will be fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r654352768



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {

Review comment:
       Can we use the following approach instead?
   ```
   try {
       Files.walkFileTree(path, new SimpleFileVisitor<>() {
           @Override
           public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
               Files.delete(dir);
               return FileVisitResult.CONTINUE;
           }
   
           @Override
           public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
               Files.delete(file);
               return FileVisitResult.CONTINUE;
           }
       });
   
       return true;
   }
   catch (IOException e) {
       return false;
   }
   ```

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {
+            try {
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+                    for (Path innerPath : stream) {
+                        boolean res = delete(innerPath);
+
+                        if (!res)
+                            return false;
+                    }
+                }
+            } catch (IOException e) {
+                return false;
+            }
+        }
+
+        if (path.toFile().getName().endsWith("jar")) {
+            try {
+                // Why do we do this?

Review comment:
       Indeed, why?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.

Review comment:
       ```suggestion
        * Deletes a file or a directory with all sub-directories and files.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}

Review comment:
       ```suggestion
    * Basic array-based implementation of the {@link DataRow}.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.

Review comment:
       I don't know if Ignite has particular rules, but Oracle Javadoc style declares that such descriptions should be written as `@param row data row` (regular letter, no period)

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.

Review comment:
       ```suggestion
        * Writes a DataRow to the storage.
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {
+        private final RocksIterator iter;
+
+        private final Predicate<SearchRow> filter;
+
+        private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+            this.iter = iter;
+            this.filter = filter;
+
+            iter.seekToFirst();
+
+            hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<DataRow> iterator() {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            while (iter.isValid() && !filter.test(new SimpleDataRow(iter.key(), null)))

Review comment:
       you should also call the `status` method in case `isValid` returns `false`, see https://github.com/facebook/rocksdb/wiki/Iterator#error-handling

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface that represents data row from the storage - a key-value pair. Can be used as a {@link SearchRow}.

Review comment:
       ```suggestion
    * Interface that represents a data row from the storage - a key-value pair. Can be used as a {@link SearchRow}.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+    /**
+     * @return Hash of the key.
+     */
+    int hash();

Review comment:
       What is this method for?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.

Review comment:
       ```suggestion
        * @param path file or directory to delete
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.
+ */
+public class StorageException extends Exception {

Review comment:
       Why does it have to be a checked exception?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/** Operation types for {@link InvokeClosure}. */
+public enum OperationType {
+    /** Noop, signifies read operation. */
+    NOOP,

Review comment:
       why not `READ`?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()

Review comment:
       Looks like this code can be simplified a little bit:
   ```
   Iterator<SimpleDataRow> iter = map.entrySet().stream()
       .map(e -> new SimpleDataRow(e.getKey().bytes(), e.getValue()))
       .filter(filter)
       .iterator();
   
   return new Cursor<>() {
       /** {@inheritDoc} */
       @Override public boolean hasNext() {
           return iter.hasNext();
       }
   
       /** {@inheritDoc} */
       @Override public DataRow next() {
           return iter.next();
       }
   
       /** {@inheritDoc} */
       @NotNull @Override public Iterator<DataRow> iterator() {
           return this;
       }
   
       /** {@inheritDoc} */
       @Override public void close() throws Exception {
           // No-op.
       }
   };
   ```
   
   What do you think?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.

Review comment:
       ```suggestion
        * @param row old row or {@code null} if the old row has not been found.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();

Review comment:
       do we need a `ConcurrentMap` here? Looks like a single ReadWriteLock would be enough...

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.
+     */
+    void call(@Nullable DataRow row);
+
+    /**
+     * @return New row for {@link OperationType#WRITE} operation.

Review comment:
       ```suggestion
        * @return new row for the {@link OperationType#WRITE} operation
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+public class SimpleRemoveInvokeClosure implements InvokeClosure {

Review comment:
       missing javadoc

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.
+     */
+    void call(@Nullable DataRow row);
+
+    /**
+     * @return New row for {@link OperationType#WRITE} operation.
+     */
+    DataRow newRow();
+
+    /**
+     * @return Operation type for this closure or {@code null} if it is unknown.
+     * After method {@link #call(DataRow)} has been called, operation type must
+     * be know and this method can not return {@code null}.
+     */
+    OperationType operationType();

Review comment:
       > or {@code null} if it is unknown
   
   This means that this method should be annotated with `@Nullable`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+public class SimpleWriteInvokeClosure implements InvokeClosure {

Review comment:
       missing javadoc

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ */
+public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
+    @BeforeEach
+    public void setUp() {
+        storage = new ConcurrentHashMapStorage();
+    }
+
+    @AfterEach
+    public void tearDown() {

Review comment:
       this method is redundant

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}
+ */
+public class SimpleDataRow implements DataRow {
+    /** Key array. */
+    private final byte[] key;
+
+    /** Value array. */
+    private final byte[] value;

Review comment:
       I think you should mark this field and all related stuff as `@Nullable`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()
+            .filter(entry -> filter.test(new SimpleDataRow(entry.getKey().bytes(), null)))
+            .iterator();
+
+        return new Cursor<DataRow>() {

Review comment:
       ```suggestion
           return new Cursor<>() {
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       Unfortunately this is not a bulletproof way of closing resources, since any of the `close` methods can thrown an exception

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void write(DataRow row) throws StorageException;
+
+    /**
+     * Removes DataRow associated with a given Key.

Review comment:
       ```suggestion
        * Removes a DataRow associated with a given Key.
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {
+        private final RocksIterator iter;
+
+        private final Predicate<SearchRow> filter;
+
+        private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+            this.iter = iter;
+            this.filter = filter;
+
+            iter.seekToFirst();
+
+            hasNext();

Review comment:
       why do you need to call this method here?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void write(DataRow row) throws StorageException;
+
+    /**
+     * Removes DataRow associated with a given Key.
+     *
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void remove(SearchRow key) throws StorageException;
+
+    /**
+     * Executes an update with custom logic implemented by storage.UpdateClosure interface.
+     *
+     * @param key Search key.
+     * @param clo Invoke closure.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void invoke(SearchRow key, InvokeClosure clo) throws StorageException;
+
+    /**
+     * Creates cursor over storage data.

Review comment:
       ```suggestion
        * Creates cursor over the storage data.
   ```

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        assertNull(storage.read(searchRow).value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.write(dataRow);
+
+        assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array());
+
+        storage.remove(searchRow);
+
+        assertNull(storage.read(searchRow).value());
+    }
+
+    @Test
+    public void invoke() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
+
+        storage.invoke(searchRow, readClosure);
+
+        Assertions.assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());

Review comment:
       ```suggestion
           assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());
   ```

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {

Review comment:
       please add javadocs for all tests

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {

Review comment:
       missing javadoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655946098



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+    /**
+     * @return Hash of the key.
+     */
+    int hash();

Review comment:
       For the future, we don't need it now. I think I'll remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655333991



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()

Review comment:
       Now I wonder why I didn't write it like this in the first place.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {
+            try {
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+                    for (Path innerPath : stream) {
+                        boolean res = delete(innerPath);
+
+                        if (!res)
+                            return false;
+                    }
+                }
+            } catch (IOException e) {
+                return false;
+            }
+        }
+
+        if (path.toFile().getName().endsWith("jar")) {
+            try {
+                // Why do we do this?

Review comment:
       Have no clue

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.

Review comment:
       I don't get it, in every other place we have this exact notation

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/** Operation types for {@link InvokeClosure}. */
+public enum OperationType {
+    /** Noop, signifies read operation. */
+    NOOP,

Review comment:
       It's more general. We're not sure that implementations will actually want to "read" values, they can just validate it and decide to do nothing.

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.
+ */
+public class StorageException extends Exception {

Review comment:
       Why not?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+    /**
+     * @return Hash of the key.
+     */
+    int hash();

Review comment:
       For the future, we don't need it now. I think I'll remove it

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();

Review comment:
       I have even more "extreme" idea - use write lock in invoke, read lock in "write"/"remove" and no lock in "read", still having concurrent map. It this fine? "synchronized" methods are not great even as a temporary solution




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655943532



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.

Review comment:
       I don't get it, in every other place we have this exact notation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656025409



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();

Review comment:
       as you wish, though I think it's a bit over-complicated for a test-only implementation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655546583



##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        assertNull(storage.read(searchRow).value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.write(dataRow);
+
+        assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array());
+
+        storage.remove(searchRow);
+
+        assertNull(storage.read(searchRow).value());
+    }
+
+    @Test
+    public void invoke() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
+
+        storage.invoke(searchRow, readClosure);
+
+        Assertions.assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());
+
+        storage.invoke(searchRow, new SimpleRemoveInvokeClosure());
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+    }
+
+    @Test
+    public void scanSimple() throws Exception {
+        List<DataRow> list = toList(storage.scan(key -> true));
+
+        assertEquals(emptyList(), list);
+
+        DataRow dataRow1 = dataRow("key1", "value1");
+
+        storage.write(dataRow1);
+
+        list = toList(storage.scan(key -> true));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+        DataRow dataRow2 = dataRow("key2", "value2");
+
+        storage.write(dataRow2);
+
+        list = toList(storage.scan(key -> true));
+
+        assertThat(list, hasSize(2));
+
+        // "key1" and "key2" have the same order both by hash and lexicographically.
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+        assertArrayEquals(dataRow2.value().array(), list.get(1).value().array());
+    }
+
+    @Test
+    public void scanFiltered() throws Exception {
+        DataRow dataRow1 = dataRow("key1", "value1");
+        DataRow dataRow2 = dataRow("key2", "value2");
+
+        storage.write(dataRow1);
+        storage.write(dataRow2);
+
+        List<DataRow> list = toList(storage.scan(key -> key.keyBytes()[3] == '1'));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+        list = toList(storage.scan(key -> key.keyBytes()[3] == '2'));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow2.value().array(), list.get(0).value().array());
+
+        list = toList(storage.scan(key -> false));
+
+        assertTrue(list.isEmpty());
+    }
+
+    /**
+     * Converts cursor to list.
+     *
+     * @param cursor Cursor.
+     * @param <T> Type of cursor content.
+     * @return List.
+     */
+    @NotNull private <T> List<T> toList(Cursor<T> cursor) {
+        return StreamSupport.stream(cursor.spliterator(), false).collect(Collectors.toList());

Review comment:
       you actually never close the passed in cursor, so this is a resource leak




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r659659375



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+    /**
+     * @return Key bytes.
+     */
+    @Nullable byte[] keyBytes();

Review comment:
       I hope you're kidding me...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656023339



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.
+ */
+public class StorageException extends Exception {

Review comment:
       because checked exceptions are very inconvenient and usually end up wrapped into unchecked exceptions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655942149



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {
+            try {
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+                    for (Path innerPath : stream) {
+                        boolean res = delete(innerPath);
+
+                        if (!res)
+                            return false;
+                    }
+                }
+            } catch (IOException e) {
+                return false;
+            }
+        }
+
+        if (path.toFile().getName().endsWith("jar")) {
+            try {
+                // Why do we do this?

Review comment:
       Have no clue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r658785372



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.
+ */
+public class StorageException extends Exception {

Review comment:
       Ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r656026514



##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       AFAIK, it is not documented anywhere that they *never* throw exceptions, therefore you should always assume that they do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655305443



##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       Unfortunately this is not a bulletproof way of closing resources, since any of the `close` methods can throw an exception




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655962656



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.

Review comment:
       That's right, but I vividly remember that Ignite uses custom notation with capital letter and a period

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       Can they? Is this documented somewhere?

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {
+        private final RocksIterator iter;
+
+        private final Predicate<SearchRow> filter;
+
+        private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+            this.iter = iter;
+            this.filter = filter;
+
+            iter.seekToFirst();
+
+            hasNext();

Review comment:
       Artifact from the past :) We don't need it

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        assertNull(storage.read(searchRow).value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.write(dataRow);
+
+        assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array());
+
+        storage.remove(searchRow);
+
+        assertNull(storage.read(searchRow).value());
+    }
+
+    @Test
+    public void invoke() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
+
+        storage.invoke(searchRow, readClosure);
+
+        Assertions.assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());
+
+        storage.invoke(searchRow, new SimpleRemoveInvokeClosure());
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+    }
+
+    @Test
+    public void scanSimple() throws Exception {
+        List<DataRow> list = toList(storage.scan(key -> true));
+
+        assertEquals(emptyList(), list);
+
+        DataRow dataRow1 = dataRow("key1", "value1");
+
+        storage.write(dataRow1);
+
+        list = toList(storage.scan(key -> true));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+        DataRow dataRow2 = dataRow("key2", "value2");
+
+        storage.write(dataRow2);
+
+        list = toList(storage.scan(key -> true));
+
+        assertThat(list, hasSize(2));
+
+        // "key1" and "key2" have the same order both by hash and lexicographically.
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+        assertArrayEquals(dataRow2.value().array(), list.get(1).value().array());
+    }
+
+    @Test
+    public void scanFiltered() throws Exception {
+        DataRow dataRow1 = dataRow("key1", "value1");
+        DataRow dataRow2 = dataRow("key2", "value2");
+
+        storage.write(dataRow1);
+        storage.write(dataRow2);
+
+        List<DataRow> list = toList(storage.scan(key -> key.keyBytes()[3] == '1'));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow1.value().array(), list.get(0).value().array());
+
+        list = toList(storage.scan(key -> key.keyBytes()[3] == '2'));
+
+        assertThat(list, hasSize(1));
+
+        assertArrayEquals(dataRow2.value().array(), list.get(0).value().array());
+
+        list = toList(storage.scan(key -> false));
+
+        assertTrue(list.isEmpty());
+    }
+
+    /**
+     * Converts cursor to list.
+     *
+     * @param cursor Cursor.
+     * @param <T> Type of cursor content.
+     * @return List.
+     */
+    @NotNull private <T> List<T> toList(Cursor<T> cursor) {
+        return StreamSupport.stream(cursor.spliterator(), false).collect(Collectors.toList());

Review comment:
       My bad




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r655945184



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.
+ */
+public class StorageException extends Exception {

Review comment:
       Why not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov merged pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov merged pull request #169:
URL: https://github.com/apache/ignite-3/pull/169


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] ibessonov commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

Posted by GitBox <gi...@apache.org>.
ibessonov commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r658785668



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();

Review comment:
       It's, like, 10 more lines of code, not a big deal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org