You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/08/05 12:24:56 UTC

[GitHub] [ignite-3] sashapolo commented on a change in pull request #254: IGNITE-14790 Persistent partition storage

sashapolo commented on a change in pull request #254:
URL: https://github.com/apache/ignite-3/pull/254#discussion_r682440463



##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
##########
@@ -17,14 +17,15 @@
 
 package org.apache.ignite.internal.storage;
 
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /** */
 public interface InvokeClosure {
     /**
      * @param row Old row or {@code null} if the old row has not been found.
      */
-    void call(@Nullable DataRow row);
+    void call(@NotNull DataRow row);

Review comment:
       this annotation now contradicts the javadoc

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -75,6 +118,40 @@
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+        rwLock.writeLock().lock();
+
+        try {
+            return keys.stream()
+                .map(key -> new SimpleDataRow(key.keyBytes(), map.remove(new ByteArray(key.keyBytes()))))

Review comment:
       1. You don't check that the map contains the row being removed, so it breaks the contract.
   2. Can be improved a little bit with `.map(SearchRow::keyBytes)`.
   3. Tests are not good.

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -112,6 +126,31 @@ public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws Sto
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) throws StorageException {
+        List<DataRow> res = new ArrayList<>();
+
+        Snapshot snapshot = db.getSnapshot();
+
+        try (ReadOptions opts = new ReadOptions().setSnapshot(snapshot)) {
+            for (SearchRow key : keys) {
+                byte[] keyBytes = key.keyBytes();
+
+                res.add(new SimpleDataRow(keyBytes, db.get(opts, keyBytes)));

Review comment:
       Have you considered using `multiGetAsList`?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -167,11 +179,27 @@ private void createTableLocally(
 
         HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
 
+        Path storageDir = partitionsStoreDir.resolve(name);
+
+        if (Files.notExists(storageDir)) {

Review comment:
       I think it would be useful to extract this code into `IgniteUtils#createDirectoriesIfNotExists`, because this is used in several places

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -53,33 +61,43 @@
  * Partition command handler.
  */
 public class PartitionListener implements RaftGroupListener {
+    /** Partition storage. */
+    private final Storage storage;
+
     /**
-     * Storage.
-     * This is a temporary solution, it will apply until persistence layer would not be implemented.
-     * TODO: IGNITE-14790.
+     * Constructor.
+     *
+     * @param storage Storage.
      */
-    private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
+    public PartitionListener(Storage storage) {
+        this.storage = storage;
+    }
 
     /** {@inheritDoc} */
     @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
         while (iterator.hasNext()) {
             CommandClosure<ReadCommand> clo = iterator.next();
 
             if (clo.command() instanceof GetCommand) {
-                clo.result(new SingleRowResponse(storage.get(
-                    extractAndWrapKey(((GetCommand)clo.command()).getKeyRow())
-                )));
+                DataRow readValue = storage.read(extractAndWrapKey(((GetCommand) clo.command()).getKeyRow()));
+
+                ByteBufferRow responseRow = null;
+
+                if (readValue.hasValueBytes())
+                    responseRow = new ByteBufferRow(readValue.valueBytes());
+
+                clo.result(new SingleRowResponse(responseRow));
             }
             else if (clo.command() instanceof GetAllCommand) {
                 Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
 
                 assert keyRows != null && !keyRows.isEmpty();
 
-                final Set<BinaryRow> res = keyRows.stream()
-                    .map(this::extractAndWrapKey)
-                    .map(storage::get)
-                    .filter(Objects::nonNull)
-                    .filter(BinaryRow::hasValue)
+                Set<BinaryRow> res = storage

Review comment:
       why `Set`?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -281,79 +305,40 @@ else if (clo.command() instanceof GetAndUpsertCommand) {
 
     /** {@inheritDoc} */
     @Override public void onShutdown() {
-        // No-op.
-    }
-
-    /**
-     * Wrapper provides correct byte[] comparison.
-     */
-    private static class KeyWrapper {
-        /** Data. */
-        private final byte[] data;
-
-        /** Hash. */
-        private final int hash;
-
-        /**
-         * Constructor.
-         *
-         * @param data Wrapped data.
-         */
-        KeyWrapper(byte[] data, int hash) {
-            assert data != null;
-
-            this.data = data;
-            this.hash = hash;
+        try {
+            storage.close();
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            KeyWrapper wrapper = (KeyWrapper)o;
-            return Arrays.equals(data, wrapper.data);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return hash;
+        catch (Exception e) {
+            throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
         }
     }
 
     /**
-     * Compares two rows.
+     * Extracts a key and a value from the {@link BinaryRow} and wraps it in a {@link DataRow}.
      *
-     * @param row Row to compare.
-     * @param row2 Row to compare.
-     * @return True if these rows is equivalent, false otherwise.
+     * @param row Binary row.
+     * @return Data row.
      */
-    private boolean equalValues(BinaryRow row, BinaryRow row2) {
-        if (row == row2)
-            return true;
-
-        if (row == null || row2 == null)
-            return false;
+    @NotNull private DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {

Review comment:
       can be `static`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -51,6 +54,20 @@
         return new SimpleDataRow(keyBytes, valueBytes);
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) {
+        rwLock.writeLock().lock();
+
+        try {
+            return keys.stream()
+                .map(key -> new SimpleDataRow(key.keyBytes(), map.get(new ByteArray(key.keyBytes()))))

Review comment:
       Also, can be written a little bit more clean if you add `.map(SearchRow::keyBytes)`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -63,6 +80,32 @@
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.writeLock().lock();
+
+        try {
+            rows.forEach(row -> map.put(new ByteArray(row.keyBytes()), row.valueBytes()));
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.writeLock().lock();
+
+        try {
+            return rows.stream()
+                .map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)

Review comment:
       I guess you should also filter out `null`s after this, because it breaks the contract described in the javadoc: `@return Collection of rows that could not be inserted`. Maybe using a `for` loop will be more straightforward...

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/DeleteExactInvokeClosure.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Closure that deletes a specific data row with a given key and a given value.
+ */
+public class DeleteExactInvokeClosure implements InvokeClosure {
+    /** Row to delete. */
+    @NotNull
+    private final DataRow row;
+
+    /** {@code true} if can delete, {@code false} otherwise. */
+    private boolean deletes = false;
+
+    /**
+     * Constructor.
+     *
+     * @param row Row to delete.
+     */
+    public DeleteExactInvokeClosure(@NotNull DataRow row) {
+        this.row = row;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void call(@NotNull DataRow row) {
+        deletes = row.hasValueBytes() && Arrays.equals(this.row.valueBytes(), row.valueBytes());
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable DataRow newRow() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable OperationType operationType() {
+        return deletes ? OperationType.REMOVE : OperationType.NOOP;
+    }
+
+    /**
+     * @return {@code true} if this closure deletes specific row, {@code false} otherwise.
+     */
+    public boolean deletes() {

Review comment:
       Can we make `InvokeClosure#call` return a generic type and get rid of this and similar methods?

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -51,6 +54,20 @@
         return new SimpleDataRow(keyBytes, valueBytes);
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) {
+        rwLock.writeLock().lock();
+
+        try {
+            return keys.stream()
+                .map(key -> new SimpleDataRow(key.keyBytes(), map.get(new ByteArray(key.keyBytes()))))

Review comment:
       Not related to this PR, but do you know, why `SearchRow#keyBytes` is `@Nullable`? If that's intentional, then you can't simply pass it into the `SimpleDataRow`

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -127,6 +166,60 @@ public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws Sto
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.readLock().lock();
+
+        try (WriteBatch batch = new WriteBatch();
+             WriteOptions opts = new WriteOptions()) {
+            for (DataRow row : rows)
+                batch.put(row.keyBytes(), row.valueBytes());
+
+            db.write(opts, batch);
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.readLock().lock();
+
+        List<DataRow> cantInsert = new ArrayList<>();
+
+        Snapshot snapshot = db.getSnapshot();

Review comment:
       maybe it would make sense to introduce a method like `withSnapshot(Consumer)`

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -142,6 +235,84 @@ public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws Sto
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+        rwLock.readLock().lock();
+
+        List<DataRow> res = new ArrayList<>();
+
+        Snapshot snapshot = db.getSnapshot();
+
+        try (WriteBatch batch = new WriteBatch();
+             ReadOptions readOpts = new ReadOptions().setSnapshot(snapshot);
+             WriteOptions opts = new WriteOptions()) {
+
+            for (SearchRow key : keys) {
+                byte[] keyBytes = key.keyBytes();
+
+                byte[] value = db.get(readOpts, keyBytes);

Review comment:
       you need to check this value for `null` before adding it to the result list

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -34,55 +44,34 @@
 import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Abstract test that covers basic scenarios of the storage API.
  */
 public abstract class AbstractStorageTest {
-    /** Storage instance. */
-    protected Storage storage;
+    /** Test key. */
+    private static final String KEY = "key";

Review comment:
       I'm not sure that these values should be extracted into constants...

##########
File path: modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
##########
@@ -188,6 +175,490 @@ public void scanFiltered() throws Exception {
         assertTrue(list.isEmpty());
     }
 
+    /**
+     * Tests that {@link InsertInvokeClosure} inserts a data row if there is no existing data row with the same key.
+     */
+    @Test
+    public void testInsertClosure() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new InsertInvokeClosure(dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.inserts());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link InsertInvokeClosure} doesn't insert a data row if there is an existing data row with the same key.
+     */
+    @Test
+    public void testInsertClosure_failureBranch() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new InsertInvokeClosure(dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.inserts());
+
+        DataRow sameKeyRow = dataRow(KEY, "test");
+
+        var sameClosure = new InsertInvokeClosure(sameKeyRow);
+
+        storage.invoke(sameKeyRow, sameClosure);
+
+        assertFalse(sameClosure.inserts());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link DeleteExactInvokeClosure} deletes a data row if a key and a value matches the ones passed
+     * in the closure.
+     */
+    @Test
+    public void testDeleteExactClosure() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new DeleteExactInvokeClosure(dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.deletes());
+
+        checkHasNoEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link DeleteExactInvokeClosure} doesn't delete a data row if a key and a value don't match
+     * the ones passed in the closure.
+     */
+    @Test
+    public void testDeleteExactClosure_failureBranch() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new DeleteExactInvokeClosure(dataRow(KEY, "test"));
+
+        storage.invoke(dataRow, closure);
+
+        assertFalse(closure.deletes());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndRemoveInvokeClosure} successfully retrieves and removes a data row.
+     */
+    @Test
+    public void testGetAndRemoveClosure() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new GetAndRemoveInvokeClosure();
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.hasData());
+
+        checkRowsEqual(dataRow, closure.oldRow());
+
+        checkHasNoEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndRemoveInvokeClosure} doesn't retrieve and remove a data row if it doesn't exist.
+     */
+    @Test
+    public void testGetAndRemoveClosure_failureBranch() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new GetAndRemoveInvokeClosure();
+
+        storage.invoke(searchRow("test"), closure);
+
+        assertFalse(closure.hasData());
+
+        assertNull(closure.oldRow().valueBytes());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code false} retrieves and replaces the existing entry in the storage.
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsFalse_entryExists() {
+        String newValue = "newValue";
+
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, newValue);
+
+        var closure = new GetAndReplaceInvokeClosure(newRow, false);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertTrue(closure.replaces());
+
+        checkRowsEqual(dataRow, replaced);
+
+        checkHasDifferentEntry(dataRow);
+        checkHasSameEntry(newRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code false} successfully inserts a new data row and returns an empty row if a previous row with the same key
+     * doesn't exist .
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsFalse_entryNotExists() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new GetAndReplaceInvokeClosure(dataRow, false);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertTrue(closure.replaces());
+
+        assertFalse(replaced.hasValueBytes());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code true} retrieves and replaces the existing entry in the storage.
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsTrue_entryExists() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, "test");
+
+        var closure = new GetAndReplaceInvokeClosure(newRow, true);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertTrue(closure.replaces());
+
+        assertTrue(replaced.hasValueBytes());
+
+        checkHasDifferentEntry(dataRow);
+        checkHasSameEntry(newRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code true} doesn't insert a new entry if a previous one doesn't exist.
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsTrue_entryNotExists() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new GetAndReplaceInvokeClosure(dataRow, true);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertFalse(closure.replaces());
+
+        assertFalse(replaced.hasValueBytes());
+
+        checkHasNoEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link ReplaceExactInvokeClosure} replaces the data row with a given key and a given value
+     * in the storage.
+     */
+    @Test
+    public void testReplaceExactClosure() {
+        String newValue = "newValue";
+
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, newValue);
+
+        var closure = new ReplaceExactInvokeClosure(dataRow, newRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.replaces());
+
+        checkHasDifferentEntry(dataRow);
+        checkHasSameEntry(newRow);
+    }
+
+    /**
+     * Tests that {@link ReplaceExactInvokeClosure} doesn't replace the data row with a given key and a given value
+     * if the value in the storage doesn't match the value passed via the closure.
+     */
+    @Test
+    public void testReplaceExactClosure_failureBranch() {
+        String newValue = "newValue";
+
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, newValue);
+
+        var closure = new ReplaceExactInvokeClosure(newRow, dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertFalse(closure.replaces());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests the {@link Storage#readAll(Collection)} operation successfully reads data rows from the storage.
+     */
+    @Test
+    public void testReadAll() {
+        List<DataRow> rows = insertBulk(100);
+
+        List<DataRow> rowsFromStorage = new ArrayList<>(storage.readAll(rows));
+
+        Comparator<DataRow> comparator = (o1, o2) -> {

Review comment:
       can be replaced with 
   ```
   Comparator<DataRow> comparator = Comparator
       .comparing(DataRow::keyBytes, Arrays::compare)
       .thenComparing(DataRow::valueBytes, Arrays::compare);
   ```    

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -127,6 +166,60 @@ public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws Sto
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.readLock().lock();
+
+        try (WriteBatch batch = new WriteBatch();
+             WriteOptions opts = new WriteOptions()) {
+            for (DataRow row : rows)
+                batch.put(row.keyBytes(), row.valueBytes());
+
+            db.write(opts, batch);
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.readLock().lock();

Review comment:
       Are you sure `readLock` is enough for this method? 

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -75,6 +118,40 @@
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+        rwLock.writeLock().lock();
+
+        try {
+            return keys.stream()
+                .map(key -> new SimpleDataRow(key.keyBytes(), map.remove(new ByteArray(key.keyBytes()))))
+                .collect(Collectors.toList());
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues) {
+        rwLock.writeLock().lock();
+
+        try {
+            return keyValues.stream()
+                .map(kv -> {
+                    if (map.remove(new ByteArray(kv.keyBytes()), kv.valueBytes()))

Review comment:
       I really doubt that this method works correctly when values are arrays

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
##########
@@ -315,6 +320,20 @@ else if (cfgContent != null)
                 )
             );
 
+            Path partitionsStore = workDir.resolve(PARTITIONS_STORE_PATH);

Review comment:
       let's extract this code into a method

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -142,9 +160,11 @@ else if (clo.command() instanceof InsertAllCommand) {
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
-                    .filter(Objects::nonNull)
+                final Set<BinaryRow> res = storage.insertAll(
+                    rows.stream().map(this::extractAndWrapKeyValue).collect(Collectors.toList())

Review comment:
       same as above

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -142,6 +235,84 @@ public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws Sto
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+        rwLock.readLock().lock();

Review comment:
       same stuff about the `readLock`

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -164,16 +184,13 @@ else if (clo.command() instanceof DeleteAllCommand) {
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> {
-                        if (k.hasValue())
-                            return null;
-                        else
-                            return storage.remove(extractAndWrapKey(k));
-                    })
-                    .filter(Objects::nonNull)
-                    .filter(BinaryRow::hasValue)
-                    .collect(Collectors.toSet());
+                final Set<BinaryRow> res =

Review comment:
       same as above

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -53,33 +61,43 @@
  * Partition command handler.
  */
 public class PartitionListener implements RaftGroupListener {
+    /** Partition storage. */
+    private final Storage storage;
+
     /**
-     * Storage.
-     * This is a temporary solution, it will apply until persistence layer would not be implemented.
-     * TODO: IGNITE-14790.
+     * Constructor.
+     *
+     * @param storage Storage.
      */
-    private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
+    public PartitionListener(Storage storage) {
+        this.storage = storage;
+    }
 
     /** {@inheritDoc} */
     @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
         while (iterator.hasNext()) {
             CommandClosure<ReadCommand> clo = iterator.next();
 
             if (clo.command() instanceof GetCommand) {
-                clo.result(new SingleRowResponse(storage.get(
-                    extractAndWrapKey(((GetCommand)clo.command()).getKeyRow())
-                )));
+                DataRow readValue = storage.read(extractAndWrapKey(((GetCommand) clo.command()).getKeyRow()));
+
+                ByteBufferRow responseRow = null;
+
+                if (readValue.hasValueBytes())
+                    responseRow = new ByteBufferRow(readValue.valueBytes());
+
+                clo.result(new SingleRowResponse(responseRow));
             }
             else if (clo.command() instanceof GetAllCommand) {
                 Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
 
                 assert keyRows != null && !keyRows.isEmpty();
 
-                final Set<BinaryRow> res = keyRows.stream()
-                    .map(this::extractAndWrapKey)
-                    .map(storage::get)
-                    .filter(Objects::nonNull)
-                    .filter(BinaryRow::hasValue)
+                Set<BinaryRow> res = storage
+                    .readAll(keyRows.stream().map(this::extractAndWrapKey).collect(Collectors.toList()))

Review comment:
       please extract the inner stream into a variable

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -142,9 +160,11 @@ else if (clo.command() instanceof InsertAllCommand) {
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
-                    .filter(Objects::nonNull)
+                final Set<BinaryRow> res = storage.insertAll(

Review comment:
       same as above

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -142,9 +160,11 @@ else if (clo.command() instanceof InsertAllCommand) {
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
-                    .filter(Objects::nonNull)
+                final Set<BinaryRow> res = storage.insertAll(

Review comment:
       ```suggestion
                   Set<BinaryRow> res = storage.insertAll(
   ```

##########
File path: modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
##########
@@ -142,6 +235,84 @@ public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws Sto
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+        rwLock.readLock().lock();
+
+        List<DataRow> res = new ArrayList<>();
+
+        Snapshot snapshot = db.getSnapshot();
+
+        try (WriteBatch batch = new WriteBatch();
+             ReadOptions readOpts = new ReadOptions().setSnapshot(snapshot);
+             WriteOptions opts = new WriteOptions()) {
+
+            for (SearchRow key : keys) {
+                byte[] keyBytes = key.keyBytes();
+
+                byte[] value = db.get(readOpts, keyBytes);
+
+                res.add(new SimpleDataRow(keyBytes, value));
+
+                batch.delete(keyBytes);
+            }
+
+            db.write(opts, batch);
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+        finally {
+            db.releaseSnapshot(snapshot);
+
+            snapshot.close();
+
+            rwLock.readLock().unlock();
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues) {
+        rwLock.readLock().lock();

Review comment:
       same stuff about the `readLock`

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -281,79 +305,40 @@ else if (clo.command() instanceof GetAndUpsertCommand) {
 
     /** {@inheritDoc} */
     @Override public void onShutdown() {
-        // No-op.
-    }
-
-    /**
-     * Wrapper provides correct byte[] comparison.
-     */
-    private static class KeyWrapper {
-        /** Data. */
-        private final byte[] data;
-
-        /** Hash. */
-        private final int hash;
-
-        /**
-         * Constructor.
-         *
-         * @param data Wrapped data.
-         */
-        KeyWrapper(byte[] data, int hash) {
-            assert data != null;
-
-            this.data = data;
-            this.hash = hash;
+        try {
+            storage.close();
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            KeyWrapper wrapper = (KeyWrapper)o;
-            return Arrays.equals(data, wrapper.data);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return hash;
+        catch (Exception e) {
+            throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
         }
     }
 
     /**
-     * Compares two rows.
+     * Extracts a key and a value from the {@link BinaryRow} and wraps it in a {@link DataRow}.
      *
-     * @param row Row to compare.
-     * @param row2 Row to compare.
-     * @return True if these rows is equivalent, false otherwise.
+     * @param row Binary row.
+     * @return Data row.
      */
-    private boolean equalValues(BinaryRow row, BinaryRow row2) {
-        if (row == row2)
-            return true;
-
-        if (row == null || row2 == null)
-            return false;
+    @NotNull private DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
+        byte[] key = new byte[row.keySlice().capacity()];
+        row.keySlice().get(key);
 
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+        byte[] value = new byte[row.keySlice().capacity()];
+        row.keySlice().get(value);
 
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
+        return new SimpleDataRow(key, row.bytes());
     }
 
     /**
-     * Makes a wrapped key from a table row.
+     * Extracts a key from the {@link BinaryRow} and wraps it in a {@link SearchRow}.
      *
-     * @param row Row.
-     * @return Extracted key.
+     * @param row Binary row.
+     * @return Search row.
      */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
+    @NotNull private SearchRow extractAndWrapKey(@NotNull BinaryRow row) {

Review comment:
       can be `static`

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -281,79 +305,40 @@ else if (clo.command() instanceof GetAndUpsertCommand) {
 
     /** {@inheritDoc} */
     @Override public void onShutdown() {
-        // No-op.
-    }
-
-    /**
-     * Wrapper provides correct byte[] comparison.
-     */
-    private static class KeyWrapper {
-        /** Data. */
-        private final byte[] data;
-
-        /** Hash. */
-        private final int hash;
-
-        /**
-         * Constructor.
-         *
-         * @param data Wrapped data.
-         */
-        KeyWrapper(byte[] data, int hash) {
-            assert data != null;
-
-            this.data = data;
-            this.hash = hash;
+        try {
+            storage.close();
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            KeyWrapper wrapper = (KeyWrapper)o;
-            return Arrays.equals(data, wrapper.data);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return hash;
+        catch (Exception e) {
+            throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
         }
     }
 
     /**
-     * Compares two rows.
+     * Extracts a key and a value from the {@link BinaryRow} and wraps it in a {@link DataRow}.
      *
-     * @param row Row to compare.
-     * @param row2 Row to compare.
-     * @return True if these rows is equivalent, false otherwise.
+     * @param row Binary row.
+     * @return Data row.
      */
-    private boolean equalValues(BinaryRow row, BinaryRow row2) {
-        if (row == row2)
-            return true;
-
-        if (row == null || row2 == null)
-            return false;
+    @NotNull private DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
+        byte[] key = new byte[row.keySlice().capacity()];
+        row.keySlice().get(key);
 
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+        byte[] value = new byte[row.keySlice().capacity()];

Review comment:
       shouldn't this be `valueSlice`?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -183,32 +200,26 @@ else if (clo.command() instanceof DeleteExactCommand) {
                 assert row != null;
                 assert row.hasValue();
 
-                final KeyWrapper key = extractAndWrapKey(row);
-                final BinaryRow old = storage.get(key);
+                DataRow keyValue = extractAndWrapKeyValue(row);
 
-                if (old == null || !old.hasValue())
-                    clo.result(false);
-                else
-                    clo.result(equalValues(row, old) && storage.remove(key) != null);
+                var deleteExact = new DeleteExactInvokeClosure(keyValue);
+
+                storage.invoke(keyValue, deleteExact);
+
+                clo.result(deleteExact.deletes());
             }
             else if (clo.command() instanceof DeleteExactAllCommand) {
                 Set<BinaryRow> rows = ((DeleteExactAllCommand)clo.command()).getRows();
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> {
-                        final KeyWrapper key = extractAndWrapKey(k);
-                        final BinaryRow old = storage.get(key);
-
-                        if (old == null || !old.hasValue() || !equalValues(k, old))
-                            return null;
-
-                        return storage.remove(key);
-                    })
-                    .filter(Objects::nonNull)
-                    .filter(BinaryRow::hasValue)
-                    .collect(Collectors.toSet());
+                final Set<BinaryRow> res =

Review comment:
       same as above

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
##########
@@ -155,7 +175,7 @@ else if (clo.command() instanceof UpsertAllCommand) {
 
                 assert rows != null && !rows.isEmpty();
 
-                rows.forEach(k -> storage.put(extractAndWrapKey(k), k));
+                storage.writeAll(rows.stream().map(this::extractAndWrapKeyValue).collect(Collectors.toList()));

Review comment:
       Not related to this PR, but there's a typo: `UpsertAllCommand`

##########
File path: modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
##########
@@ -63,6 +80,32 @@
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.writeLock().lock();
+
+        try {
+            rows.forEach(row -> map.put(new ByteArray(row.keyBytes()), row.valueBytes()));
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+        rwLock.writeLock().lock();
+
+        try {
+            return rows.stream()
+                .map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)

Review comment:
       which also means, that your tests for this feature are not exhaustive =)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org