You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/21 12:15:51 UTC

[GitHub] [ignite-3] sashapolo commented on a change in pull request #169: IGNITE-14745 Storage API for partitions.

sashapolo commented on a change in pull request #169:
URL: https://github.com/apache/ignite-3/pull/169#discussion_r654352768



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {

Review comment:
       Can we use the following approach instead?
   ```
   try {
       Files.walkFileTree(path, new SimpleFileVisitor<>() {
           @Override
           public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
               Files.delete(dir);
               return FileVisitResult.CONTINUE;
           }
   
           @Override
           public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
               Files.delete(file);
               return FileVisitResult.CONTINUE;
           }
       });
   
       return true;
   }
   catch (IOException e) {
       return false;
   }
   ```

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(Path path) {
+        if (Files.isDirectory(path)) {
+            try {
+                try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
+                    for (Path innerPath : stream) {
+                        boolean res = delete(innerPath);
+
+                        if (!res)
+                            return false;
+                    }
+                }
+            } catch (IOException e) {
+                return false;
+            }
+        }
+
+        if (path.toFile().getName().endsWith("jar")) {
+            try {
+                // Why do we do this?

Review comment:
       Indeed, why?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.

Review comment:
       ```suggestion
        * Deletes a file or a directory with all sub-directories and files.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}

Review comment:
       ```suggestion
    * Basic array-based implementation of the {@link DataRow}.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.

Review comment:
       I don't know if Ignite has particular rules, but Oracle Javadoc style declares that such descriptions should be written as `@param row data row` (regular letter, no period)

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.

Review comment:
       ```suggestion
        * Writes a DataRow to the storage.
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {
+        private final RocksIterator iter;
+
+        private final Predicate<SearchRow> filter;
+
+        private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+            this.iter = iter;
+            this.filter = filter;
+
+            iter.seekToFirst();
+
+            hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<DataRow> iterator() {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            while (iter.isValid() && !filter.test(new SimpleDataRow(iter.key(), null)))

Review comment:
       you should also call the `status` method in case `isValid` returns `false`, see https://github.com/facebook/rocksdb/wiki/Iterator#error-handling

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface that represents data row from the storage - a key-value pair. Can be used as a {@link SearchRow}.

Review comment:
       ```suggestion
    * Interface that represents a data row from the storage - a key-value pair. Can be used as a {@link SearchRow}.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface to be used as a key representation to search data in storage.
+ */
+public interface SearchRow {
+    /**
+     * @return Hash of the key.
+     */
+    int hash();

Review comment:
       What is this method for?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
##########
@@ -335,4 +339,46 @@ public static ClassLoader igniteClassLoader() {
 
         return cls;
     }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param path File or directory to delete.

Review comment:
       ```suggestion
        * @param path file or directory to delete
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ * Exception thrown by storage.
+ */
+public class StorageException extends Exception {

Review comment:
       Why does it have to be a checked exception?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/OperationType.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/** Operation types for {@link InvokeClosure}. */
+public enum OperationType {
+    /** Noop, signifies read operation. */
+    NOOP,

Review comment:
       why not `READ`?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()

Review comment:
       Looks like this code can be simplified a little bit:
   ```
   Iterator<SimpleDataRow> iter = map.entrySet().stream()
       .map(e -> new SimpleDataRow(e.getKey().bytes(), e.getValue()))
       .filter(filter)
       .iterator();
   
   return new Cursor<>() {
       /** {@inheritDoc} */
       @Override public boolean hasNext() {
           return iter.hasNext();
       }
   
       /** {@inheritDoc} */
       @Override public DataRow next() {
           return iter.next();
       }
   
       /** {@inheritDoc} */
       @NotNull @Override public Iterator<DataRow> iterator() {
           return this;
       }
   
       /** {@inheritDoc} */
       @Override public void close() throws Exception {
           // No-op.
       }
   };
   ```
   
   What do you think?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.

Review comment:
       ```suggestion
        * @param row old row or {@code null} if the old row has not been found.
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();

Review comment:
       do we need a `ConcurrentMap` here? Looks like a single ReadWriteLock would be enough...

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.
+     */
+    void call(@Nullable DataRow row);
+
+    /**
+     * @return New row for {@link OperationType#WRITE} operation.

Review comment:
       ```suggestion
        * @return new row for the {@link OperationType#WRITE} operation
   ```

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+public class SimpleRemoveInvokeClosure implements InvokeClosure {

Review comment:
       missing javadoc

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public interface InvokeClosure {
+    /**
+     * @param row Old row or {@code null} if old row not found.
+     */
+    void call(@Nullable DataRow row);
+
+    /**
+     * @return New row for {@link OperationType#WRITE} operation.
+     */
+    DataRow newRow();
+
+    /**
+     * @return Operation type for this closure or {@code null} if it is unknown.
+     * After method {@link #call(DataRow)} has been called, operation type must
+     * be know and this method can not return {@code null}.
+     */
+    OperationType operationType();

Review comment:
       > or {@code null} if it is unknown
   
   This means that this method should be annotated with `@Nullable`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.Nullable;
+
+public class SimpleWriteInvokeClosure implements InvokeClosure {

Review comment:
       missing javadoc

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorageTest.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.AbstractStorageTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Storage test implementation for {@link ConcurrentHashMapStorage}.
+ */
+public class ConcurrentHashMapStorageTest extends AbstractStorageTest {
+    @BeforeEach
+    public void setUp() {
+        storage = new ConcurrentHashMapStorage();
+    }
+
+    @AfterEach
+    public void tearDown() {

Review comment:
       this method is redundant

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * Basic array-based implementation of the {@link DataRow}
+ */
+public class SimpleDataRow implements DataRow {
+    /** Key array. */
+    private final byte[] key;
+
+    /** Value array. */
+    private final byte[] value;

Review comment:
       I think you should mark this field and all related stuff as `@Nullable`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage implementation based on {@link ConcurrentHashMap}.
+ */
+public class ConcurrentHashMapStorage implements Storage {
+    /** Storage content. */
+    private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        byte[] valueBytes = map.get(new ByteArray(keyBytes));
+
+        return new SimpleDataRow(keyBytes, valueBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        map.remove(new ByteArray(key.keyBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        byte[] keyBytes = key.keyBytes();
+
+        ByteArray mapKey = new ByteArray(keyBytes);
+        byte[] existingDataBytes = map.get(mapKey);
+
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
+
+                break;
+
+            case REMOVE:
+                map.remove(mapKey);
+
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        Iterator<Map.Entry<ByteArray, byte[]>> iter = map.entrySet().stream()
+            .filter(entry -> filter.test(new SimpleDataRow(entry.getKey().bytes(), null)))
+            .iterator();
+
+        return new Cursor<DataRow>() {

Review comment:
       ```suggestion
           return new Cursor<>() {
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();

Review comment:
       Unfortunately this is not a bulletproof way of closing resources, since any of the `close` methods can thrown an exception

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void write(DataRow row) throws StorageException;
+
+    /**
+     * Removes DataRow associated with a given Key.

Review comment:
       ```suggestion
        * Removes a DataRow associated with a given Key.
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {
+        private final RocksIterator iter;
+
+        private final Predicate<SearchRow> filter;
+
+        private ScanCursor(RocksIterator iter, Predicate<SearchRow> filter) {
+            this.iter = iter;
+            this.filter = filter;
+
+            iter.seekToFirst();
+
+            hasNext();

Review comment:
       why do you need to call this method here?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * Interface providing methods to read, remove and update keys in storage.
+ */
+public interface Storage {
+    /**
+     * Reads a DataRow for a given key.
+     *
+     * @param key Search row.
+     * @return Data row.
+     */
+    public DataRow read(SearchRow key) throws StorageException;
+
+    /**
+     * Writes DataRow to the storage.
+     *
+     * @param row Data row.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void write(DataRow row) throws StorageException;
+
+    /**
+     * Removes DataRow associated with a given Key.
+     *
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void remove(SearchRow key) throws StorageException;
+
+    /**
+     * Executes an update with custom logic implemented by storage.UpdateClosure interface.
+     *
+     * @param key Search key.
+     * @param clo Invoke closure.
+     * @throws StorageException If failed to read data or storage is already stopped.
+     */
+    public void invoke(SearchRow key, InvokeClosure clo) throws StorageException;
+
+    /**
+     * Creates cursor over storage data.

Review comment:
       ```suggestion
        * Creates cursor over the storage data.
   ```

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        assertNull(storage.read(searchRow).value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.write(dataRow);
+
+        assertArrayEquals(dataRow.value().array(), storage.read(searchRow).value().array());
+
+        storage.remove(searchRow);
+
+        assertNull(storage.read(searchRow).value());
+    }
+
+    @Test
+    public void invoke() throws Exception {
+        SearchRow searchRow = searchRow("key");
+
+        SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
+
+        storage.invoke(searchRow, readClosure);
+
+        assertNull(readClosure.row().value());
+
+        DataRow dataRow = dataRow("key", "value");
+
+        storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
+
+        storage.invoke(searchRow, readClosure);
+
+        Assertions.assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());

Review comment:
       ```suggestion
           assertArrayEquals(dataRow.value().array(), readClosure.row().value().array());
   ```

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleWriteInvokeClosure;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Abstract test that covers basic scenarios of the storage API.
+ */
+public abstract class AbstractStorageTest {
+    /** Storage instance. */
+    protected Storage storage;
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    @Test
+    public void readWriteRemove() throws Exception {

Review comment:
       please add javadocs for all tests

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.rocksdb.AbstractComparator;
+import org.rocksdb.ComparatorOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+/**
+ * Storage implementation based on a single RocksDB instance.
+ */
+public class RocksDbStorage implements Storage, AutoCloseable {
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RocksDB comparator options. */
+    private final ComparatorOptions comparatorOptions;
+
+    /** RocksDB comparator. */
+    private final AbstractComparator comparator;
+
+    /** RockDB options. */
+    private final Options options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /**
+     * @param dbPath Path to the folder to store data.
+     * @param comparator Keys comparator.
+     * @throws StorageException If failed to create RocksDB instance.
+     */
+    public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
+        try {
+            comparatorOptions = new ComparatorOptions();
+
+            this.comparator = new AbstractComparator(comparatorOptions) {
+                /** {@inheritDoc} */
+                @Override public String name() {
+                    return "comparator";
+                }
+
+                /** {@inheritDoc} */
+                @Override public int compare(ByteBuffer a, ByteBuffer b) {
+                    return comparator.compare(a, b);
+                }
+            };
+
+            options = new Options();
+
+            options.setCreateIfMissing(true);
+
+            options.setComparator(this.comparator);
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+        }
+        catch (RocksDBException e) {
+            close();
+
+            throw new StorageException("Failed to start storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataRow read(SearchRow key) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+
+            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataRow row) throws StorageException {
+        try {
+            db.put(row.keyBytes(), row.valueBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void remove(SearchRow key) throws StorageException {
+        try {
+            db.delete(key.keyBytes());
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+        try {
+            byte[] keyBytes = key.keyBytes();
+            byte[] existingDataBytes = db.get(keyBytes);
+
+            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+
+            switch (clo.operationType()) {
+                case WRITE:
+                    db.put(keyBytes, clo.newRow().valueBytes());
+
+                    break;
+
+                case REMOVE:
+                    db.delete(keyBytes);
+
+                    break;
+
+                case NOOP:
+                    break;
+            }
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to access data in the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
+        return new ScanCursor(db.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (db != null)
+            db.close();
+
+        if (options != null)
+            options.close();
+
+        if (comparator != null)
+            comparator.close();
+
+        if (comparatorOptions != null)
+            comparatorOptions.close();
+    }
+
+    private static class ScanCursor implements Cursor<DataRow> {

Review comment:
       missing javadoc




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org