You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/08/10 10:07:10 UTC
[ignite-3] branch main updated: IGNITE-14790 Persistent partition
storage based on RocksDB. (#254)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 68d80ef IGNITE-14790 Persistent partition storage based on RocksDB. (#254)
68d80ef is described below
commit 68d80efcd133d7ab3da71a586977304956df0856
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Aug 10 13:07:03 2021 +0300
IGNITE-14790 Persistent partition storage based on RocksDB. (#254)
---
.../apache/ignite/internal/app/IgnitionImpl.java | 28 +-
.../apache/ignite/internal/schema/BinaryRow.java | 5 +
.../ignite/internal/schema/ByteBufferRow.java | 5 +
.../org/apache/ignite/internal/schema/row/Row.java | 4 +
.../apache/ignite/internal/storage/DataRow.java | 5 +
.../ignite/internal/storage/InvokeClosure.java | 25 +-
.../apache/ignite/internal/storage/SearchRow.java | 6 +-
.../apache/ignite/internal/storage/Storage.java | 63 ++-
.../storage/basic/ConcurrentHashMapStorage.java | 112 ++--
...eClosure.java => DeleteExactInvokeClosure.java} | 50 +-
...Closure.java => GetAndRemoveInvokeClosure.java} | 46 +-
.../storage/basic/GetAndReplaceInvokeClosure.java | 86 ++++
...InvokeClosure.java => InsertInvokeClosure.java} | 36 +-
...Closure.java => ReplaceExactInvokeClosure.java} | 44 +-
.../internal/storage/basic/SimpleDataRow.java | 25 +-
.../storage/basic/SimpleReadInvokeClosure.java | 7 +-
.../storage/basic/SimpleRemoveInvokeClosure.java | 5 +-
.../storage/basic/SimpleWriteInvokeClosure.java | 5 +-
.../internal/storage/AbstractStorageTest.java | 567 +++++++++++++++++++--
.../internal/storage/rocksdb/RocksDbStorage.java | 175 ++++++-
modules/table/pom.xml | 10 +
.../ignite/distributed/ITDistributedTableTest.java | 14 +-
.../internal/table/distributed/TableManager.java | 32 +-
.../table/distributed/command/ReplaceCommand.java | 2 +
.../table/distributed/raft/PartitionListener.java | 279 +++++-----
.../ignite/internal/table/TableManagerTest.java | 12 +-
.../raft/PartitionCommandListenerTest.java | 31 +-
27 files changed, 1333 insertions(+), 346 deletions(-)
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index bb633e0..fc559c8 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -81,6 +81,11 @@ public class IgnitionImpl implements Ignition {
*/
private static final Path VAULT_DB_PATH = Paths.get("vault");
+ /**
+ * Path for the partitions persistent storage.
+ */
+ private static final Path PARTITIONS_STORE_PATH = Paths.get("db");
+
/** */
private static final String[] BANNER = {
"",
@@ -324,7 +329,8 @@ public class IgnitionImpl implements Ignition {
metaStorageMgr,
schemaMgr,
affinityMgr,
- raftMgr
+ raftMgr,
+ getPartitionsStorePath(workDir)
)
);
@@ -376,6 +382,26 @@ public class IgnitionImpl implements Ignition {
}
/**
+ * Returns a path to the partitions store directory.
+ * Creates a directory if it doesn't exist.
+ *
+ * @param workDir Ignite work directory.
+ * @return Partitions store path.
+ */
+ @NotNull
+ private static Path getPartitionsStorePath(Path workDir) {
+ Path partitionsStore = workDir.resolve(PARTITIONS_STORE_PATH);
+
+ try {
+ Files.createDirectories(partitionsStore);
+ } catch (IOException e) {
+ throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e);
+ }
+
+ return partitionsStore;
+ }
+
+ /**
* Starts the Vault component.
*/
private static VaultManager createVault(Path workDir) {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index ba20a16..df32145 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -133,6 +133,11 @@ public interface BinaryRow {
byte[] readBytes(int off, int len);
/**
+ * @return Byte array of the row.
+ */
+ byte[] bytes();
+
+ /**
* Row flags.
*/
final class RowFlags {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 6041aa3..a2edc42 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -154,4 +154,9 @@ public class ByteBufferRow implements BinaryRow {
buf.limit(buf.capacity());
}
}
+
+ /** {@inheritDoc} */
+ @Override public byte[] bytes() {
+ return buf.array();
+ }
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index ea802e5..5aa4e33 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -665,4 +665,8 @@ public class Row implements BinaryRow {
@Override public byte[] readBytes(int off, int len) {
return row.readBytes(off, len);
}
+
+ @Override public byte[] bytes() {
+ return row.bytes();
+ }
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
index 4373f98..34b1618 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
@@ -33,4 +33,9 @@ public interface DataRow extends SearchRow {
* @return Value object as a byte buffer. Allows more effective memory management in certain cases.
*/
ByteBuffer value();
+
+ /**
+ * @return {@code true} if this row has a value.
+ */
+ boolean hasValueBytes();
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
index 7126069..67dd309 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
@@ -17,14 +17,31 @@
package org.apache.ignite.internal.storage;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-/** */
-public interface InvokeClosure {
+/**
+ * Closure that performs an operation on the storage.
+ *
+ * @param <T> Type of the invocation's result.
+ */
+public interface InvokeClosure<T> {
+ /**
+ * In this method closure decides what type of operation should be performed on the storage, based on the
+ * current data in the storage passed as an argument.
+ * The result of the operation can be obtained via the {@link #result()} method.
+ *
+ * @param row Old row.
+ */
+ void call(@NotNull DataRow row);
+
/**
- * @param row Old row or {@code null} if the old row has not been found.
+ * @return Result of the invocation. Can be {@code null}.
*/
- void call(@Nullable DataRow row);
+ @Nullable
+ default T result() {
+ return null;
+ }
/**
* @return New row for the {@link OperationType#WRITE} operation.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
index 74db8d9..ce3ceff 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.storage;
import java.nio.ByteBuffer;
-import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.NotNull;
/**
* Interface to be used as a key representation to search data in storage.
@@ -27,10 +27,10 @@ public interface SearchRow {
/**
* @return Key bytes.
*/
- byte @Nullable [] keyBytes();
+ byte @NotNull [] keyBytes();
/**
* @return Key object as a byte buffer. Allows more effective memory management in certain cases.
*/
- @Nullable ByteBuffer key();
+ @NotNull ByteBuffer key();
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
index 9be5a27..0778e74 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
@@ -17,46 +17,97 @@
package org.apache.ignite.internal.storage;
+import java.util.Collection;
import java.util.function.Predicate;
import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
/**
* Interface providing methods to read, remove and update keys in storage.
+ * Any locking is unnecessary as this storage is used within RAFT groups where all write operations are
+ * serialized.
*/
-public interface Storage {
+public interface Storage extends AutoCloseable {
/**
* Reads a DataRow for a given key.
*
* @param key Search row.
* @return Data row.
- * @throws StorageException If failed to read data or storage is already stopped.
+ * @throws StorageException If failed to read the data or the storage is already stopped.
*/
public DataRow read(SearchRow key) throws StorageException;
/**
- * Writes a DataRow to the storage.
+ * Reads {@link DataRow}s for a given collection of keys.
+ *
+ * @param keys Search rows.
+ * @return Data rows.
+ * @throws StorageException If failed to read the data or the storage is already stopped.
+ */
+ public Collection<DataRow> readAll(Collection<? extends SearchRow> keys);
+
+ /**
+ * Writes a DataRow into the storage.
*
* @param row Data row.
- * @throws StorageException If failed to read data or storage is already stopped.
+ * @throws StorageException If failed to write the data or the storage is already stopped.
*/
public void write(DataRow row) throws StorageException;
/**
+ * Writes a collection of {@link DataRow}s into the storage.
+ *
+ * @param rows Data rows.
+ * @throws StorageException If failed to write the data or the storage is already stopped.
+ */
+ public void writeAll(Collection<? extends DataRow> rows) throws StorageException;
+
+ /**
+ * Inserts a collection of {@link DataRow}s into the storage and returns a collection of rows that
+ * can't be inserted due to their keys being already present in the storage.
+ *
+ * @param rows Data rows.
+ * @return Collection of rows that could not be inserted.
+ * @throws StorageException If failed to write the data or the storage is already stopped.
+ */
+ public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException;
+
+ /**
* Removes a DataRow associated with a given Key.
*
* @param key Search row.
- * @throws StorageException If failed to read data or storage is already stopped.
+ * @throws StorageException If failed to remove the data or the storage is already stopped.
*/
public void remove(SearchRow key) throws StorageException;
/**
+ * Removes {@link DataRow}s mapped by given keys.
+ *
+ * @param keys Search rows.
+ * @return List of removed data rows.
+ * @throws StorageException If failed to remove the data or the storage is already stopped.
+ */
+ public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys);
+
+ /**
+ * Removes {@link DataRow}s mapped by given keys and containing given values.
+ *
+ * @param keyValues Data rows.
+ * @return List of removed data rows.
+ * @throws StorageException If failed to remove the data or the storage is already stopped.
+ */
+ public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues);
+
+ /**
* Executes an update with custom logic implemented by storage.UpdateClosure interface.
*
* @param key Search key.
* @param clo Invoke closure.
+ * @param <T> Closure invocation's result type.
* @throws StorageException If failed to read data or storage is already stopped.
*/
- public void invoke(SearchRow key, InvokeClosure clo) throws StorageException;
+ @Nullable
+ public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException;
/**
* Creates cursor over the storage data.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
index e462653..9b8d820 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.storage.basic;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.SearchRow;
@@ -31,6 +33,7 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Storage implementation based on {@link ConcurrentHashMap}.
@@ -39,9 +42,6 @@ public class ConcurrentHashMapStorage implements Storage {
/** Storage content. */
private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
- /** RW lock. */
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
/** {@inheritDoc} */
@Override public DataRow read(SearchRow key) throws StorageException {
byte[] keyBytes = key.keyBytes();
@@ -52,60 +52,91 @@ public class ConcurrentHashMapStorage implements Storage {
}
/** {@inheritDoc} */
+ @Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) {
+ return keys.stream()
+ .map(SearchRow::keyBytes)
+ .map(key -> new SimpleDataRow(key, map.get(new ByteArray(key))))
+ .collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
@Override public void write(DataRow row) throws StorageException {
- rwLock.readLock().lock();
+ map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+ }
- try {
- map.put(new ByteArray(row.keyBytes()), row.valueBytes());
- }
- finally {
- rwLock.readLock().unlock();
- }
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+ rows.forEach(row -> map.put(new ByteArray(row.keyBytes()), row.valueBytes()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+ return rows.stream()
+ .map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
}
/** {@inheritDoc} */
@Override public void remove(SearchRow key) throws StorageException {
- rwLock.readLock().lock();
+ map.remove(new ByteArray(key.keyBytes()));
+ }
- try {
- map.remove(new ByteArray(key.keyBytes()));
- }
- finally {
- rwLock.readLock().unlock();
- }
+ /** {@inheritDoc} */
+ @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+ return keys.stream()
+ .map(SearchRow::keyBytes)
+ .map(key -> new SimpleDataRow(key, map.remove(new ByteArray(key))))
+ .filter(SimpleDataRow::hasValueBytes)
+ .collect(Collectors.toList());
}
/** {@inheritDoc} */
- @Override public void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+ @Override public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues) {
+ return keyValues.stream()
+ .filter(kv -> {
+ ByteArray key = new ByteArray(kv.keyBytes());
+
+ byte[] currentValue = map.get(key);
+
+ if (Arrays.equals(currentValue, kv.valueBytes())) {
+ map.remove(key);
+
+ return true;
+ }
+
+ return false;
+ })
+ .collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
byte[] keyBytes = key.keyBytes();
ByteArray mapKey = new ByteArray(keyBytes);
- rwLock.writeLock().lock();
+ byte[] existingDataBytes = map.get(mapKey);
- try {
- byte[] existingDataBytes = map.get(mapKey);
+ clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
- clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+ switch (clo.operationType()) {
+ case WRITE:
+ map.put(mapKey, clo.newRow().valueBytes());
- switch (clo.operationType()) {
- case WRITE:
- map.put(mapKey, clo.newRow().valueBytes());
+ break;
- break;
+ case REMOVE:
+ map.remove(mapKey);
- case REMOVE:
- map.remove(mapKey);
+ break;
- break;
-
- case NOOP:
- break;
- }
- }
- finally {
- rwLock.writeLock().unlock();
+ case NOOP:
+ break;
}
+
+ return clo.result();
}
/** {@inheritDoc} */
@@ -137,4 +168,9 @@ public class ConcurrentHashMapStorage implements Storage {
}
};
}
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ // No-op.
+ }
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/DeleteExactInvokeClosure.java
similarity index 55%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/DeleteExactInvokeClosure.java
index 71f3b79..97e73d4 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/DeleteExactInvokeClosure.java
@@ -17,41 +17,51 @@
package org.apache.ignite.internal.storage.basic;
+import java.util.Arrays;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-/** Invoke closure implementation for read operation. */
-public class SimpleReadInvokeClosure implements InvokeClosure {
- /** Copy of the row that was passed to {@link #call(DataRow)} method. */
- @Nullable
- private DataRow row;
+/**
+ * Closure that deletes a specific data row with a given key and a given value.
+ */
+public class DeleteExactInvokeClosure implements InvokeClosure<Boolean> {
+ /** Row to delete. */
+ @NotNull
+ private final DataRow row;
+
+ /** {@code true} if can delete, {@code false} otherwise. */
+ private boolean deletes = false;
+
+ /**
+ * Constructor.
+ *
+ * @param row Row to delete.
+ */
+ public DeleteExactInvokeClosure(@NotNull DataRow row) {
+ this.row = row;
+ }
/** {@inheritDoc} */
- @Override public void call(@Nullable DataRow row) {
- this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+ @Override public void call(@NotNull DataRow row) {
+ deletes = row.hasValueBytes() && Arrays.equals(this.row.valueBytes(), row.valueBytes());
}
/** {@inheritDoc} */
- @Nullable
- @Override public DataRow newRow() {
+ @Override public @Nullable DataRow newRow() {
return null;
}
/** {@inheritDoc} */
- @Nullable
- @Override public OperationType operationType() {
- return OperationType.NOOP;
+ @Override public @Nullable OperationType operationType() {
+ return deletes ? OperationType.REMOVE : OperationType.NOOP;
}
- /**
- * Copy of the row that was passed to {@link #call(DataRow)} method.
- *
- * @return Copy of data row.
- */
- @Nullable
- public DataRow row() {
- return row;
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Boolean result() {
+ return deletes;
}
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndRemoveInvokeClosure.java
similarity index 56%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndRemoveInvokeClosure.java
index 71f3b79..b40e9d7 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndRemoveInvokeClosure.java
@@ -20,38 +20,50 @@ package org.apache.ignite.internal.storage.basic;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-/** Invoke closure implementation for read operation. */
-public class SimpleReadInvokeClosure implements InvokeClosure {
- /** Copy of the row that was passed to {@link #call(DataRow)} method. */
+/**
+ * Closure that removes a data row with a given key and returns it.
+ */
+public class GetAndRemoveInvokeClosure implements InvokeClosure<Boolean> {
+ /** Row that will be removed. */
@Nullable
- private DataRow row;
+ private DataRow rowToRemove;
+
+ /** {@code true} if can delete, {@code false} otherwise. */
+ private boolean deletes;
/** {@inheritDoc} */
- @Override public void call(@Nullable DataRow row) {
- this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+ @Override public void call(@NotNull DataRow row) {
+ this.rowToRemove = row;
+
+ this.deletes = rowToRemove.hasValueBytes();
}
/** {@inheritDoc} */
- @Nullable
- @Override public DataRow newRow() {
+ @Override public @Nullable DataRow newRow() {
return null;
}
/** {@inheritDoc} */
- @Nullable
- @Override public OperationType operationType() {
- return OperationType.NOOP;
+ @Override public @Nullable OperationType operationType() {
+ return deletes ? OperationType.REMOVE : OperationType.NOOP;
}
/**
- * Copy of the row that was passed to {@link #call(DataRow)} method.
- *
- * @return Copy of data row.
+ * @return Row that is removed.
*/
- @Nullable
- public DataRow row() {
- return row;
+ @NotNull
+ public DataRow oldRow() {
+ assert rowToRemove != null;
+
+ return rowToRemove;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Boolean result() {
+ return deletes;
}
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndReplaceInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndReplaceInvokeClosure.java
new file mode 100644
index 0000000..e3b18d3
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndReplaceInvokeClosure.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Closure that replaces a data row with a given key and returns it.
+ */
+public class GetAndReplaceInvokeClosure implements InvokeClosure<Boolean> {
+ /** New row. */
+ @NotNull
+ private final DataRow newRow;
+
+ /** {@code true} if this closure should insert a new row only if a previous value exists. */
+ private final boolean onlyIfExists;
+
+ /** Previous data row. */
+ private DataRow oldRow;
+
+ /** {@code true} if this closure replaces a row, {@code false} otherwise. */
+ private boolean replaces;
+
+ /**
+ * Constructor.
+ *
+ * @param newRow New row.
+ * @param onlyIfExists Whether to insert a new row only if a previous one exists.
+ */
+ public GetAndReplaceInvokeClosure(@NotNull DataRow newRow, boolean onlyIfExists) {
+ this.newRow = newRow;
+ this.onlyIfExists = onlyIfExists;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@NotNull DataRow row) {
+ oldRow = row;
+
+ replaces = row.hasValueBytes() || !onlyIfExists;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable DataRow newRow() {
+ return newRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable OperationType operationType() {
+ return replaces ? OperationType.WRITE : OperationType.NOOP;
+ }
+
+ /**
+ * @return Previous data row.
+ */
+ @NotNull
+ public DataRow oldRow() {
+ assert oldRow != null;
+
+ return oldRow;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Boolean result() {
+ return replaces;
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/InsertInvokeClosure.java
similarity index 61%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/InsertInvokeClosure.java
index 4749511..ee8dcbd 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/InsertInvokeClosure.java
@@ -20,33 +20,47 @@ package org.apache.ignite.internal.storage.basic;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-/** Invoke closure implementation for a write operation. */
-public class SimpleWriteInvokeClosure implements InvokeClosure {
- /** Data row to write into storage. */
+/**
+ * Closure that inserts a new data row.
+ */
+public class InsertInvokeClosure implements InvokeClosure<Boolean> {
+ /** New row. */
+ @NotNull
private final DataRow newRow;
+ /** {@code true} if this closure inserts a new row, {@code false} otherwise. */
+ private boolean inserts = false;
+
/**
- * @param newRow Data row to write into the storage.
+ * Constructor.
+ *
+ * @param newRow New row.
*/
- public SimpleWriteInvokeClosure(DataRow newRow) {
+ public InsertInvokeClosure(@NotNull DataRow newRow) {
this.newRow = newRow;
}
/** {@inheritDoc} */
- @Override public void call(@Nullable DataRow row) {
+ @Override public void call(@NotNull DataRow row) {
+ inserts = !row.hasValueBytes();
}
/** {@inheritDoc} */
- @Nullable
- @Override public DataRow newRow() {
+ @Override public @Nullable DataRow newRow() {
return newRow;
}
/** {@inheritDoc} */
- @Nullable
- @Override public OperationType operationType() {
- return OperationType.WRITE;
+ @Override public @Nullable OperationType operationType() {
+ return inserts ? OperationType.WRITE : OperationType.NOOP;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Boolean result() {
+ return inserts;
}
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ReplaceExactInvokeClosure.java
similarity index 51%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ReplaceExactInvokeClosure.java
index 4749511..7b5905e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ReplaceExactInvokeClosure.java
@@ -17,36 +17,58 @@
package org.apache.ignite.internal.storage.basic;
+import java.util.Arrays;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-/** Invoke closure implementation for a write operation. */
-public class SimpleWriteInvokeClosure implements InvokeClosure {
- /** Data row to write into storage. */
+/**
+ * Closure that replaces an exact row with a specified key and a specified value.
+ */
+public class ReplaceExactInvokeClosure implements InvokeClosure<Boolean> {
+ /** Expected data row. */
+ @NotNull
+ private final DataRow expectedRow;
+
+ /** New data row. */
+ @NotNull
private final DataRow newRow;
+ /** {@code true} if this closure replaces a row, {@code false} otherwise. */
+ private boolean replaces;
+
/**
- * @param newRow Data row to write into the storage.
+ * Constructor.
+ *
+ * @param expectedRow Expected row.
+ * @param newRow New row.
*/
- public SimpleWriteInvokeClosure(DataRow newRow) {
+ public ReplaceExactInvokeClosure(@NotNull DataRow expectedRow, @NotNull DataRow newRow) {
+ this.expectedRow = expectedRow;
this.newRow = newRow;
}
/** {@inheritDoc} */
- @Override public void call(@Nullable DataRow row) {
+ @Override public void call(@NotNull DataRow row) {
+ replaces = (!row.hasValueBytes() && !expectedRow.hasValueBytes())
+ || (Arrays.equals(row.valueBytes(), expectedRow.valueBytes()));
}
/** {@inheritDoc} */
- @Nullable
- @Override public DataRow newRow() {
+ @Override public @Nullable DataRow newRow() {
return newRow;
}
/** {@inheritDoc} */
- @Nullable
- @Override public OperationType operationType() {
- return OperationType.WRITE;
+ @Override public @Nullable OperationType operationType() {
+ return replaces ? OperationType.WRITE : OperationType.NOOP;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Boolean result() {
+ return replaces;
}
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
index f90ea1d..bda70b1 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.storage.basic;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.ignite.internal.storage.DataRow;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -37,12 +39,13 @@ public class SimpleDataRow implements DataRow {
}
/** {@inheritDoc} */
+ @NotNull
@Override public ByteBuffer key() {
return ByteBuffer.wrap(key);
}
/** {@inheritDoc} */
- @Override public byte[] keyBytes() {
+ @Override public byte @NotNull [] keyBytes() {
return key;
}
@@ -56,4 +59,24 @@ public class SimpleDataRow implements DataRow {
@Override public byte @Nullable [] valueBytes() {
return value;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean hasValueBytes() {
+ return value != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SimpleDataRow row = (SimpleDataRow) o;
+ return Arrays.equals(key, row.key) && Arrays.equals(value, row.value);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = Arrays.hashCode(key);
+ result = 31 * result + Arrays.hashCode(value);
+ return result;
+ }
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
index 71f3b79..e77cf52 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
@@ -20,17 +20,18 @@ package org.apache.ignite.internal.storage.basic;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/** Invoke closure implementation for read operation. */
-public class SimpleReadInvokeClosure implements InvokeClosure {
+public class SimpleReadInvokeClosure implements InvokeClosure<Void> {
/** Copy of the row that was passed to {@link #call(DataRow)} method. */
@Nullable
private DataRow row;
/** {@inheritDoc} */
- @Override public void call(@Nullable DataRow row) {
- this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+ @Override public void call(@NotNull DataRow row) {
+ this.row = new SimpleDataRow(row.keyBytes(), row.valueBytes());
}
/** {@inheritDoc} */
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
index abb3db8..dc6f80d 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.storage.basic;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/** Invoke closure implementation for a remove operation. */
-public class SimpleRemoveInvokeClosure implements InvokeClosure {
+public class SimpleRemoveInvokeClosure implements InvokeClosure<Void> {
/** {@inheritDoc} */
- @Override public void call(@Nullable DataRow row) {
+ @Override public void call(@NotNull DataRow row) {
}
/** {@inheritDoc} */
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
index 4749511..87fa289 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.storage.basic;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/** Invoke closure implementation for a write operation. */
-public class SimpleWriteInvokeClosure implements InvokeClosure {
+public class SimpleWriteInvokeClosure implements InvokeClosure<Void> {
/** Data row to write into storage. */
private final DataRow newRow;
@@ -35,7 +36,7 @@ public class SimpleWriteInvokeClosure implements InvokeClosure {
}
/** {@inheritDoc} */
- @Override public void call(@Nullable DataRow row) {
+ @Override public void call(@NotNull DataRow row) {
}
/** {@inheritDoc} */
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
index f225b58..1218d92 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
@@ -18,9 +18,21 @@
package org.apache.ignite.internal.storage;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
+import org.apache.ignite.internal.storage.basic.InsertInvokeClosure;
+import org.apache.ignite.internal.storage.basic.ReplaceExactInvokeClosure;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
@@ -34,6 +46,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,48 +55,25 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Abstract test that covers basic scenarios of the storage API.
*/
public abstract class AbstractStorageTest {
- /** Storage instance. */
- protected Storage storage;
+ /** Test key. */
+ private static final String KEY = "key";
- /**
- * Wraps string key into a search row.
- *
- * @param key String key.
- * @return Search row.
- */
- private SearchRow searchRow(String key) {
- return new SimpleDataRow(
- key.getBytes(StandardCharsets.UTF_8),
- null
- );
- }
+ /** Test value. */
+ private static final String VALUE = "value";
- /**
- * Wraps string key/value pair into a data row.
- *
- * @param key String key.
- * @param value String value.
- * @return Data row.
- */
- private DataRow dataRow(String key, String value) {
- return new SimpleDataRow(
- key.getBytes(StandardCharsets.UTF_8),
- value.getBytes(StandardCharsets.UTF_8)
- );
- }
+ /** Storage instance. */
+ protected Storage storage;
/**
* Tests that read / write / remove work consistently on the same key.
- *
- * @throws Exception If failed.
*/
@Test
- public void readWriteRemove() throws Exception {
- SearchRow searchRow = searchRow("key");
+ public void readWriteRemove() {
+ SearchRow searchRow = searchRow(KEY);
assertNull(storage.read(searchRow).value());
- DataRow dataRow = dataRow("key", "value");
+ DataRow dataRow = dataRow(KEY, VALUE);
storage.write(dataRow);
@@ -96,12 +87,10 @@ public abstract class AbstractStorageTest {
/**
* Tests that invoke method works consistently with default read / write / remove closures implementations on the
* same key.
- *
- * @throws Exception If failed.
*/
@Test
- public void invoke() throws Exception {
- SearchRow searchRow = searchRow("key");
+ public void invoke() {
+ SearchRow searchRow = searchRow(KEY);
SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
@@ -109,7 +98,7 @@ public abstract class AbstractStorageTest {
assertNull(readClosure.row().value());
- DataRow dataRow = dataRow("key", "value");
+ DataRow dataRow = dataRow(KEY, VALUE);
storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
@@ -189,6 +178,510 @@ public abstract class AbstractStorageTest {
}
/**
+ * Tests that {@link InsertInvokeClosure} inserts a data row if there is no existing data row with the same key.
+ */
+ @Test
+ public void testInsertClosure() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ var closure = new InsertInvokeClosure(dataRow);
+
+ storage.invoke(dataRow, closure);
+
+ assertTrue(closure.result());
+
+ checkHasSameEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link InsertInvokeClosure} doesn't insert a data row if there is an existing data row with the same key.
+ */
+ @Test
+ public void testInsertClosure_failureBranch() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ var closure = new InsertInvokeClosure(dataRow);
+
+ storage.invoke(dataRow, closure);
+
+ assertTrue(closure.result());
+
+ DataRow sameKeyRow = dataRow(KEY, "test");
+
+ var sameClosure = new InsertInvokeClosure(sameKeyRow);
+
+ storage.invoke(sameKeyRow, sameClosure);
+
+ assertFalse(sameClosure.result());
+
+ checkHasSameEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link DeleteExactInvokeClosure} deletes a data row if a key and a value matches the ones passed
+ * in the closure.
+ */
+ @Test
+ public void testDeleteExactClosure() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ var closure = new DeleteExactInvokeClosure(dataRow);
+
+ storage.invoke(dataRow, closure);
+
+ assertTrue(closure.result());
+
+ checkHasNoEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link DeleteExactInvokeClosure} doesn't delete a data row if a key and a value don't match
+ * the ones passed in the closure.
+ */
+ @Test
+ public void testDeleteExactClosure_failureBranch() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ var closure = new DeleteExactInvokeClosure(dataRow(KEY, "test"));
+
+ storage.invoke(dataRow, closure);
+
+ assertFalse(closure.result());
+
+ checkHasSameEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link GetAndRemoveInvokeClosure} successfully retrieves and removes a data row.
+ */
+ @Test
+ public void testGetAndRemoveClosure() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ var closure = new GetAndRemoveInvokeClosure();
+
+ storage.invoke(dataRow, closure);
+
+ assertTrue(closure.result());
+
+ checkRowsEqual(dataRow, closure.oldRow());
+
+ checkHasNoEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link GetAndRemoveInvokeClosure} doesn't retrieve and remove a data row if it doesn't exist.
+ */
+ @Test
+ public void testGetAndRemoveClosure_failureBranch() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ var closure = new GetAndRemoveInvokeClosure();
+
+ storage.invoke(searchRow("test"), closure);
+
+ assertFalse(closure.result());
+
+ assertNull(closure.oldRow().valueBytes());
+
+ checkHasSameEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+ * {@code false} retrieves and replaces the existing entry in the storage.
+ */
+ @Test
+ public void testGetAndReplaceClosureIfExistsFalse_entryExists() {
+ String newValue = "newValue";
+
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ DataRow newRow = dataRow(KEY, newValue);
+
+ var closure = new GetAndReplaceInvokeClosure(newRow, false);
+
+ storage.invoke(dataRow, closure);
+
+ DataRow replaced = closure.oldRow();
+
+ assertNotNull(replaced);
+
+ assertTrue(closure.result());
+
+ checkRowsEqual(dataRow, replaced);
+
+ checkHasDifferentEntry(dataRow);
+ checkHasSameEntry(newRow);
+ }
+
+ /**
+ * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+ * {@code false} successfully inserts a new data row and returns an empty row if a previous row with the same key
+ * doesn't exist .
+ */
+ @Test
+ public void testGetAndReplaceClosureIfExistsFalse_entryNotExists() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ var closure = new GetAndReplaceInvokeClosure(dataRow, false);
+
+ storage.invoke(dataRow, closure);
+
+ DataRow replaced = closure.oldRow();
+
+ assertNotNull(replaced);
+
+ assertTrue(closure.result());
+
+ assertFalse(replaced.hasValueBytes());
+
+ checkHasSameEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+ * {@code true} retrieves and replaces the existing entry in the storage.
+ */
+ @Test
+ public void testGetAndReplaceClosureIfExistsTrue_entryExists() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ DataRow newRow = dataRow(KEY, "test");
+
+ var closure = new GetAndReplaceInvokeClosure(newRow, true);
+
+ storage.invoke(dataRow, closure);
+
+ DataRow replaced = closure.oldRow();
+
+ assertNotNull(replaced);
+
+ assertTrue(closure.result());
+
+ assertTrue(replaced.hasValueBytes());
+
+ checkHasDifferentEntry(dataRow);
+ checkHasSameEntry(newRow);
+ }
+
+ /**
+ * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+ * {@code true} doesn't insert a new entry if a previous one doesn't exist.
+ */
+ @Test
+ public void testGetAndReplaceClosureIfExistsTrue_entryNotExists() {
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ var closure = new GetAndReplaceInvokeClosure(dataRow, true);
+
+ storage.invoke(dataRow, closure);
+
+ DataRow replaced = closure.oldRow();
+
+ assertNotNull(replaced);
+
+ assertFalse(closure.result());
+
+ assertFalse(replaced.hasValueBytes());
+
+ checkHasNoEntry(dataRow);
+ }
+
+ /**
+ * Tests that {@link ReplaceExactInvokeClosure} replaces the data row with a given key and a given value
+ * in the storage.
+ */
+ @Test
+ public void testReplaceExactClosure() {
+ String newValue = "newValue";
+
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ DataRow newRow = dataRow(KEY, newValue);
+
+ var closure = new ReplaceExactInvokeClosure(dataRow, newRow);
+
+ storage.invoke(dataRow, closure);
+
+ assertTrue(closure.result());
+
+ checkHasDifferentEntry(dataRow);
+ checkHasSameEntry(newRow);
+ }
+
+ /**
+ * Tests that {@link ReplaceExactInvokeClosure} doesn't replace the data row with a given key and a given value
+ * if the value in the storage doesn't match the value passed via the closure.
+ */
+ @Test
+ public void testReplaceExactClosure_failureBranch() {
+ String newValue = "newValue";
+
+ DataRow dataRow = dataRow(KEY, VALUE);
+
+ storage.write(dataRow);
+
+ checkHasSameEntry(dataRow);
+
+ DataRow newRow = dataRow(KEY, newValue);
+
+ var closure = new ReplaceExactInvokeClosure(newRow, dataRow);
+
+ storage.invoke(dataRow, closure);
+
+ assertFalse(closure.result());
+
+ checkHasSameEntry(dataRow);
+ }
+
+ /**
+ * Tests the {@link Storage#readAll(Collection)} operation successfully reads data rows from the storage.
+ */
+ @Test
+ public void testReadAll() {
+ List<DataRow> rows = insertBulk(100);
+
+ List<DataRow> rowsFromStorage = new ArrayList<>(storage.readAll(rows));
+
+ Comparator<DataRow> comparator = Comparator.comparing(DataRow::keyBytes, Arrays::compare)
+ .thenComparing(DataRow::valueBytes, Arrays::compare);
+
+ rows.sort(comparator);
+ rowsFromStorage.sort(comparator);
+
+ assertEquals(rows, rowsFromStorage);
+ }
+
+ /**
+ * Tests that {@link Storage#writeAll(Collection)} operation successfully writes a collection of data rows into the
+ * storage.
+ */
+ @Test
+ public void testWriteAll() {
+ List<DataRow> rows = IntStream.range(0, 100)
+ .mapToObj(i -> dataRow(KEY + i, VALUE + i))
+ .collect(Collectors.toList());
+
+ storage.writeAll(rows);
+ rows.forEach(this::checkHasSameEntry);
+ }
+
+ /**
+ * Tests that {@link Storage#insertAll(Collection)} operation doesn't insert data rows which keys
+ * are already present in the storage. This operation must also return the list of such data rows.
+ */
+ @Test
+ public void testInsertAll() {
+ List<DataRow> rows = insertBulk(100);
+
+ List<DataRow> oldRows = rows.subList(0, 50);
+
+ List<DataRow> newInsertion = Stream.concat(
+ oldRows.stream(),
+ IntStream.range(100, 150).mapToObj(i -> dataRow(KEY + "_" + i, VALUE + "_" + i))
+ ).collect(Collectors.toList());
+
+ Collection<DataRow> cantInsert = storage.insertAll(newInsertion);
+
+ assertEquals(oldRows, cantInsert);
+ }
+
+ /**
+ * Tests that {@link Storage#removeAll(Collection)} operation successfully retrieves and removes a collection of
+ * {@link SearchRow}s.
+ */
+ @Test
+ public void testRemoveAll() throws Exception {
+ List<DataRow> rows = insertBulk(100);
+
+ Collection<DataRow> removed = storage.removeAll(
+ rows.stream().map(r -> new SimpleDataRow(r.keyBytes(), null)).collect(Collectors.toList())
+ );
+
+ assertEquals(rows, removed);
+
+ Cursor<DataRow> scan = storage.scan(row -> true);
+
+ assertFalse(scan.hasNext());
+
+ scan.close();
+ }
+
+ @Test
+ public void testRemoveAllKeyNotExists() throws Exception {
+ Collection<DataRow> removed = storage.removeAll(Collections.singleton(searchRow(KEY)));
+
+ assertNotNull(removed);
+
+ assertTrue(removed.isEmpty());
+ }
+
+ /**
+ * Tests that {@link Storage#removeAllExact(Collection)} operation successfully removes and retrieves a collection
+ * of data rows with the given exact keys and values from the storage.
+ */
+ @Test
+ public void testRemoveAllExact() throws Exception {
+ List<DataRow> rows = insertBulk(100);
+
+ Collection<DataRow> removed = storage.removeAllExact(rows);
+
+ assertEquals(rows, removed);
+
+ Cursor<DataRow> scan = storage.scan(row -> true);
+
+ assertFalse(scan.hasNext());
+
+ scan.close();
+ }
+
+ /**
+ * Tests that {@link Storage#removeAllExact(Collection)} operation doesn't remove and retrieve a collection
+ * of data rows with the given exact keys and values from the storage if the value in the storage doesn't match
+ * the given value.
+ */
+ @Test
+ public void testRemoveAllExact_failureBranch() throws Exception {
+ List<DataRow> rows = insertBulk(100);
+
+ List<DataRow> notExactRows = IntStream.range(0, 100)
+ .mapToObj(i -> dataRow(KEY + i, VALUE + (i + 1)))
+ .collect(Collectors.toList());
+
+ Collection<DataRow> removed = storage.removeAllExact(notExactRows);
+
+ assertEquals(0, removed.size());
+
+ rows.forEach(this::checkHasSameEntry);
+ }
+
+ /**
+ * Inserts and returns a given amount of data rows with {@link #KEY}_i as a key and {@link #VALUE}_i as a value
+ * where i is an index of the data row.
+ *
+ * @param numberOfEntries Amount of entries to insert.
+ * @return List of inserted rows.
+ */
+ private List<DataRow> insertBulk(int numberOfEntries) {
+ List<DataRow> rows = IntStream.range(0, numberOfEntries)
+ .mapToObj(i -> dataRow(KEY + "_" + i, VALUE + "_" + i))
+ .collect(Collectors.toList());
+
+ storage.insertAll(rows);
+ rows.forEach(this::checkHasSameEntry);
+
+ // Clone key and value byte arrays so that returned rows have new references.
+ // This way we check that the ConcurrentHashMapStorage performs an actual array comparison.
+ return rows.stream().map(row -> {
+ byte[] valueBytes = row.valueBytes();
+
+ assert valueBytes != null;
+
+ return new SimpleDataRow(row.keyBytes().clone(), valueBytes.clone());
+ }).collect(Collectors.toList());
+ }
+
+ /**
+ * Checks that the storage has no value for the given search row.
+ *
+ * @param row Search row.
+ */
+ private void checkHasNoEntry(SearchRow row) {
+ DataRow read = storage.read(row);
+
+ assertFalse(read.hasValueBytes());
+ }
+
+ /**
+ * Checks that the storage contains a row with a given key but with a different value.
+ *
+ * @param row Data row.
+ */
+ private void checkHasDifferentEntry(DataRow row) {
+ DataRow read = storage.read(row);
+
+ assertFalse(Arrays.equals(row.valueBytes(), read.valueBytes()));
+ }
+
+ /**
+ * Checks that the storage contains a specific data row.
+ *
+ * @param row Expected data row.
+ */
+ private void checkHasSameEntry(DataRow row) {
+ DataRow read = storage.read(row);
+
+ checkRowsEqual(row, read);
+ }
+
+ /**
+ * Checks that two rows are equal.
+ *
+ * @param expected Expected data row.
+ * @param actual Actual data row.
+ */
+ private void checkRowsEqual(DataRow expected, DataRow actual) {
+ assertArrayEquals(expected.keyBytes(), actual.keyBytes());
+ assertArrayEquals(expected.valueBytes(), actual.valueBytes());
+ }
+
+ /**
+ * Wraps string key into a search row.
+ *
+ * @param key String key.
+ * @return Search row.
+ */
+ private SearchRow searchRow(String key) {
+ return new SimpleDataRow(
+ key.getBytes(StandardCharsets.UTF_8),
+ null
+ );
+ }
+
+ /**
+ * Wraps string key/value pair into a data row.
+ *
+ * @param key String key.
+ * @param value String value.
+ * @return Data row.
+ */
+ private DataRow dataRow(String key, String value) {
+ return new SimpleDataRow(
+ key.getBytes(StandardCharsets.UTF_8),
+ value.getBytes(StandardCharsets.UTF_8)
+ );
+ }
+
+ /**
* Converts cursor to list.
*
* @param cursor Cursor.
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
index e05cc76..fbb3652 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.storage.rocksdb;
import java.nio.ByteBuffer;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.SearchRow;
@@ -32,19 +35,23 @@ import org.apache.ignite.internal.storage.Storage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractComparator;
import org.rocksdb.ComparatorOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
/**
* Storage implementation based on a single RocksDB instance.
*/
-public class RocksDbStorage implements Storage, AutoCloseable {
+public class RocksDbStorage implements Storage {
static {
RocksDB.loadLibrary();
}
@@ -61,9 +68,6 @@ public class RocksDbStorage implements Storage, AutoCloseable {
/** RocksDb instance. */
private final RocksDB db;
- /** RW lock. */
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
/**
* @param dbPath Path to the folder to store data.
* @param comparator Keys comparator.
@@ -94,7 +98,12 @@ public class RocksDbStorage implements Storage, AutoCloseable {
this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
}
catch (RocksDBException e) {
- close();
+ try {
+ close();
+ }
+ catch (Exception ex) {
+ e.addSuppressed(ex);
+ }
throw new StorageException("Failed to start the storage", e);
}
@@ -113,39 +122,160 @@ public class RocksDbStorage implements Storage, AutoCloseable {
}
/** {@inheritDoc} */
- @Override public void write(DataRow row) throws StorageException {
- rwLock.readLock().lock();
+ @Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) throws StorageException {
+ List<DataRow> res = new ArrayList<>();
+
+ try {
+ List<byte[]> keysList = keys.stream().map(SearchRow::keyBytes).collect(Collectors.toList());
+
+ List<byte[]> values = db.multiGetAsList(keysList);
+
+ assert keys.size() == values.size();
+
+ for (int i = 0; i < keysList.size(); i++) {
+ byte[] key = keysList.get(i);
+ res.add(new SimpleDataRow(key, values.get(i)));
+ }
+
+ return res;
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to read data from the storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataRow row) throws StorageException {
try {
db.put(row.keyBytes(), row.valueBytes());
}
catch (RocksDBException e) {
throw new StorageException("Filed to write data to the storage", e);
}
- finally {
- rwLock.readLock().unlock();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+ try (WriteBatch batch = new WriteBatch();
+ WriteOptions opts = new WriteOptions()) {
+ for (DataRow row : rows)
+ batch.put(row.keyBytes(), row.valueBytes());
+
+ db.write(opts, batch);
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Filed to write data to the storage", e);
}
}
/** {@inheritDoc} */
- @Override public void remove(SearchRow key) throws StorageException {
- rwLock.readLock().lock();
+ @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+ List<DataRow> cantInsert = new ArrayList<>();
+ try (WriteBatch batch = new WriteBatch();
+ WriteOptions opts = new WriteOptions()) {
+
+ for (DataRow row : rows) {
+ if (db.get(row.keyBytes()) == null)
+ batch.put(row.keyBytes(), row.valueBytes());
+ else
+ cantInsert.add(row);
+ }
+
+ db.write(opts, batch);
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Filed to write data to the storage", e);
+ }
+
+ return cantInsert;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(SearchRow key) throws StorageException {
try {
db.delete(key.keyBytes());
}
catch (RocksDBException e) {
throw new StorageException("Failed to remove data from the storage", e);
}
- finally {
- rwLock.readLock().unlock();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+ List<DataRow> res = new ArrayList<>();
+
+ try (WriteBatch batch = new WriteBatch();
+ WriteOptions opts = new WriteOptions()) {
+
+ for (SearchRow key : keys) {
+ byte[] keyBytes = key.keyBytes();
+
+ byte[] value = db.get(keyBytes);
+
+ if (value != null) {
+ res.add(new SimpleDataRow(keyBytes, value));
+
+ batch.delete(keyBytes);
+ }
+ }
+
+ db.write(opts, batch);
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to remove data from the storage", e);
}
+
+ return res;
}
/** {@inheritDoc} */
- @Override public void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
- rwLock.writeLock().lock();
+ @Override public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues) {
+ List<DataRow> res = new ArrayList<>();
+
+ try (WriteBatch batch = new WriteBatch();
+ WriteOptions opts = new WriteOptions()) {
+
+ List<byte[]> keys = new ArrayList<>();
+ List<byte[]> expectedValues = new ArrayList<>();
+ for (DataRow keyValue : keyValues) {
+ byte[] keyBytes = keyValue.keyBytes();
+ byte[] valueBytes = keyValue.valueBytes();
+
+ keys.add(keyBytes);
+ expectedValues.add(valueBytes);
+ }
+
+ List<byte[]> values = db.multiGetAsList(keys);
+
+ assert values.size() == expectedValues.size();
+
+ for (int i = 0; i < keys.size(); i++) {
+ byte[] key = keys.get(i);
+ byte[] expectedValue = expectedValues.get(i);
+ byte[] value = values.get(i);
+
+ if (Arrays.equals(value, expectedValue)) {
+ res.add(new SimpleDataRow(key, value));
+
+ batch.delete(key);
+ }
+ }
+
+ db.write(opts, batch);
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to remove data from the storage", e);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
try {
byte[] keyBytes = key.keyBytes();
byte[] existingDataBytes = db.get(keyBytes);
@@ -166,13 +296,12 @@ public class RocksDbStorage implements Storage, AutoCloseable {
case NOOP:
break;
}
+
+ return clo.result();
}
catch (RocksDBException e) {
throw new StorageException("Failed to access data in the storage", e);
}
- finally {
- rwLock.writeLock().unlock();
- }
}
/** {@inheritDoc} */
@@ -181,10 +310,8 @@ public class RocksDbStorage implements Storage, AutoCloseable {
}
/** {@inheritDoc} */
- @Override public void close() {
- try (comparatorOptions; comparator; options) {
- db.close();
- }
+ @Override public void close() throws Exception {
+ IgniteUtils.closeAll(comparatorOptions, comparator, options, db);
}
/** Cusror wrapper over the RocksIterator object with custom filter. */
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index 6f9f25e..d005e3b 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -83,6 +83,16 @@
<artifactId>ignite-metastorage-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-rocksdb</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index cd57072..6857db8 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.distributed;
+import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,6 +39,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
@@ -169,7 +171,11 @@ public class ITDistributedTableTest {
List<Peer> conf = List.of(new Peer(cluster.get(0).topologyService().localMember().address()));
- partSrv.startRaftGroup(grpId, new PartitionListener(), conf);
+ partSrv.startRaftGroup(
+ grpId,
+ new PartitionListener(new RocksDbStorage(dataPath.resolve("db"), ByteBuffer::compareTo)),
+ conf
+ );
RaftGroupService partRaftGrp = new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200);
@@ -252,7 +258,11 @@ public class ITDistributedTableTest {
List<Peer> conf = List.of(new Peer(partNodes.get(0).address()));
- rs.startRaftGroup(grpId, new PartitionListener(), conf);
+ rs.startRaftGroup(
+ grpId,
+ new PartitionListener(new RocksDbStorage(dataPath.resolve("part" + p), ByteBuffer::compareTo)),
+ conf
+ );
partMap.put(p, new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4485a6d..c99dd43 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -17,7 +17,11 @@
package org.apache.ignite.internal.table.distributed;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -58,6 +62,7 @@ import org.apache.ignite.internal.schema.SchemaModificationException;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -69,6 +74,7 @@ import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterNode;
@@ -107,6 +113,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Affinity manager. */
private final AffinityManager affMgr;
+ /** Partitions store directory. */
+ private final Path partitionsStoreDir;
+
/** Tables. */
private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
@@ -121,19 +130,22 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @param schemaMgr Schema manager.
* @param affMgr Affinity manager.
* @param raftMgr Raft manager.
+ * @param partitionsStoreDir Partitions store directory.
*/
public TableManager(
ConfigurationManager configurationMgr,
MetaStorageManager metaStorageMgr,
SchemaManager schemaMgr,
AffinityManager affMgr,
- Loza raftMgr
+ Loza raftMgr,
+ Path partitionsStoreDir
) {
this.configurationMgr = configurationMgr;
this.metaStorageMgr = metaStorageMgr;
this.affMgr = affMgr;
this.raftMgr = raftMgr;
this.schemaMgr = schemaMgr;
+ this.partitionsStoreDir = partitionsStoreDir;
}
/** {@inheritDoc} */
@@ -167,11 +179,27 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+ Path storageDir = partitionsStoreDir.resolve(name);
+
+ try {
+ Files.createDirectories(storageDir);
+ } catch (IOException e) {
+ throw new IgniteInternalException(
+ "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+ e
+ );
+ }
+
for (int p = 0; p < partitions; p++) {
+ RocksDbStorage storage = new RocksDbStorage(
+ storageDir.resolve(String.valueOf(p)),
+ ByteBuffer::compareTo
+ );
+
partitionMap.put(p, raftMgr.prepareRaftGroup(
raftGroupName(tblId, p),
assignment.get(p),
- new PartitionListener()
+ new PartitionListener(storage)
));
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
index 298484f..8f6d662 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
@@ -68,6 +68,7 @@ public class ReplaceCommand implements WriteCommand {
*
* @return Binary row.
*/
+ @NotNull
public BinaryRow getRow() {
if (row == null)
row = new ByteBufferRow(rowBytes);
@@ -80,6 +81,7 @@ public class ReplaceCommand implements WriteCommand {
*
* @return Binary row.
*/
+ @NotNull
public BinaryRow getOldRow() {
if (oldRow == null)
oldRow = new ByteBufferRow(oldRowBytes);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 4f34d0d..dd37cb9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -18,14 +18,22 @@
package org.apache.ignite.internal.table.distributed.raft;
import java.nio.file.Path;
-import java.util.Arrays;
import java.util.Iterator;
-import java.util.Objects;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
+import org.apache.ignite.internal.storage.basic.InsertInvokeClosure;
+import org.apache.ignite.internal.storage.basic.ReplaceExactInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
@@ -43,6 +51,7 @@ import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
@@ -53,12 +62,17 @@ import org.jetbrains.annotations.NotNull;
* Partition command handler.
*/
public class PartitionListener implements RaftGroupListener {
+ /** Partition storage. */
+ private final Storage storage;
+
/**
- * Storage.
- * This is a temporary solution, it will apply until persistence layer would not be implemented.
- * TODO: IGNITE-14790.
+ * Constructor.
+ *
+ * @param storage Storage.
*/
- private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
+ public PartitionListener(Storage storage) {
+ this.storage = storage;
+ }
/** {@inheritDoc} */
@Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
@@ -66,21 +80,29 @@ public class PartitionListener implements RaftGroupListener {
CommandClosure<ReadCommand> clo = iterator.next();
if (clo.command() instanceof GetCommand) {
- clo.result(new SingleRowResponse(storage.get(
- extractAndWrapKey(((GetCommand)clo.command()).getKeyRow())
- )));
+ DataRow readValue = storage.read(extractAndWrapKey(((GetCommand) clo.command()).getKeyRow()));
+
+ ByteBufferRow responseRow = null;
+
+ if (readValue.hasValueBytes())
+ responseRow = new ByteBufferRow(readValue.valueBytes());
+
+ clo.result(new SingleRowResponse(responseRow));
}
else if (clo.command() instanceof GetAllCommand) {
Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
assert keyRows != null && !keyRows.isEmpty();
- final Set<BinaryRow> res = keyRows.stream()
- .map(this::extractAndWrapKey)
- .map(storage::get)
- .filter(Objects::nonNull)
- .filter(BinaryRow::hasValue)
- .collect(Collectors.toSet());
+ List<SearchRow> keys = keyRows.stream().map(PartitionListener::extractAndWrapKey)
+ .collect(Collectors.toList());
+
+ List<BinaryRow> res = storage
+ .readAll(keys)
+ .stream()
+ .filter(DataRow::hasValueBytes)
+ .map(read -> new ByteBufferRow(read.valueBytes()))
+ .collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
@@ -99,41 +121,41 @@ public class PartitionListener implements RaftGroupListener {
assert row.hasValue() : "Insert command should have a value.";
- BinaryRow previous = storage.putIfAbsent(extractAndWrapKey(row), row);
+ DataRow newRow = extractAndWrapKeyValue(row);
+
+ InsertInvokeClosure writeIfAbsent = new InsertInvokeClosure(newRow);
- clo.result(previous == null);
+ storage.invoke(newRow, writeIfAbsent);
+
+ clo.result(writeIfAbsent.result());
}
else if (clo.command() instanceof DeleteCommand) {
- BinaryRow deleted = storage.remove(
- extractAndWrapKey(((DeleteCommand)clo.command()).getKeyRow())
- );
+ SearchRow newRow = extractAndWrapKey(((DeleteCommand)clo.command()).getKeyRow());
+
+ var getAndRemoveClosure = new GetAndRemoveInvokeClosure();
+
+ storage.invoke(newRow, getAndRemoveClosure);
- clo.result(deleted != null);
+ clo.result(getAndRemoveClosure.result());
}
else if (clo.command() instanceof ReplaceCommand) {
ReplaceCommand cmd = ((ReplaceCommand)clo.command());
- BinaryRow expected = cmd.getOldRow();
+ DataRow expected = extractAndWrapKeyValue(cmd.getOldRow());
+ DataRow newRow = extractAndWrapKeyValue(cmd.getRow());
- KeyWrapper key = extractAndWrapKey(expected);
+ var replaceClosure = new ReplaceExactInvokeClosure(expected, newRow);
- BinaryRow current = storage.get(key);
+ storage.invoke(expected, replaceClosure);
- if ((current == null && !expected.hasValue()) ||
- equalValues(current, expected)) {
- storage.put(key, cmd.getRow());
-
- clo.result(true);
- }
- else
- clo.result(false);
+ clo.result(replaceClosure.result());
}
else if (clo.command() instanceof UpsertCommand) {
BinaryRow row = ((UpsertCommand)clo.command()).getRow();
assert row.hasValue() : "Upsert command should have a value.";
- storage.put(extractAndWrapKey(row), row);
+ storage.write(extractAndWrapKeyValue(row));
clo.result(null);
}
@@ -142,11 +164,14 @@ public class PartitionListener implements RaftGroupListener {
assert rows != null && !rows.isEmpty();
- final Set<BinaryRow> res = rows.stream()
- .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
- .filter(Objects::nonNull)
+ List<DataRow> keyValues = rows.stream().map(PartitionListener::extractAndWrapKeyValue)
+ .collect(Collectors.toList());
+
+ List<BinaryRow> res = storage.insertAll(keyValues).stream()
+ .filter(DataRow::hasValueBytes)
+ .map(inserted -> new ByteBufferRow(inserted.valueBytes()))
.filter(BinaryRow::hasValue)
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
@@ -155,7 +180,7 @@ public class PartitionListener implements RaftGroupListener {
assert rows != null && !rows.isEmpty();
- rows.forEach(k -> storage.put(extractAndWrapKey(k), k));
+ storage.writeAll(rows.stream().map(PartitionListener::extractAndWrapKeyValue).collect(Collectors.toList()));
clo.result(null);
}
@@ -164,16 +189,14 @@ public class PartitionListener implements RaftGroupListener {
assert rows != null && !rows.isEmpty();
- final Set<BinaryRow> res = rows.stream()
- .map(k -> {
- if (k.hasValue())
- return null;
- else
- return storage.remove(extractAndWrapKey(k));
- })
- .filter(Objects::nonNull)
+ List<SearchRow> keys = rows.stream().map(PartitionListener::extractAndWrapKey)
+ .collect(Collectors.toList());
+
+ List<BinaryRow> res = storage.removeAll(keys).stream()
+ .filter(DataRow::hasValueBytes)
+ .map(removed -> new ByteBufferRow(removed.valueBytes()))
.filter(BinaryRow::hasValue)
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
@@ -183,32 +206,27 @@ public class PartitionListener implements RaftGroupListener {
assert row != null;
assert row.hasValue();
- final KeyWrapper key = extractAndWrapKey(row);
- final BinaryRow old = storage.get(key);
+ DataRow keyValue = extractAndWrapKeyValue(row);
- if (old == null || !old.hasValue())
- clo.result(false);
- else
- clo.result(equalValues(row, old) && storage.remove(key) != null);
+ var deleteExact = new DeleteExactInvokeClosure(keyValue);
+
+ storage.invoke(keyValue, deleteExact);
+
+ clo.result(deleteExact.result());
}
else if (clo.command() instanceof DeleteExactAllCommand) {
Set<BinaryRow> rows = ((DeleteExactAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
- final Set<BinaryRow> res = rows.stream()
- .map(k -> {
- final KeyWrapper key = extractAndWrapKey(k);
- final BinaryRow old = storage.get(key);
-
- if (old == null || !old.hasValue() || !equalValues(k, old))
- return null;
+ List<DataRow> keyValues = rows.stream().map(PartitionListener::extractAndWrapKeyValue)
+ .collect(Collectors.toList());
- return storage.remove(key);
- })
- .filter(Objects::nonNull)
+ List<BinaryRow> res = storage.removeAllExact(keyValues).stream()
+ .filter(DataRow::hasValueBytes)
+ .map(inserted -> new ByteBufferRow(inserted.valueBytes()))
.filter(BinaryRow::hasValue)
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
@@ -217,51 +235,64 @@ public class PartitionListener implements RaftGroupListener {
assert row != null;
- final KeyWrapper key = extractAndWrapKey(row);
- final BinaryRow oldRow = storage.get(key);
+ DataRow keyValue = extractAndWrapKeyValue(row);
- if (oldRow == null || !oldRow.hasValue())
- clo.result(false);
- else
- clo.result(storage.put(key, row) == oldRow);
+ var replaceIfExists = new GetAndReplaceInvokeClosure(keyValue, true);
+
+ storage.invoke(keyValue, replaceIfExists);
+
+ clo.result(replaceIfExists.result());
}
else if (clo.command() instanceof GetAndDeleteCommand) {
BinaryRow row = ((GetAndDeleteCommand)clo.command()).getKeyRow();
assert row != null;
- BinaryRow oldRow = storage.remove(extractAndWrapKey(row));
+ SearchRow keyRow = extractAndWrapKey(row);
- if (oldRow == null || !oldRow.hasValue())
- clo.result(new SingleRowResponse(null));
+ var getAndRemoveClosure = new GetAndRemoveInvokeClosure();
+
+ storage.invoke(keyRow, getAndRemoveClosure);
+
+ if (getAndRemoveClosure.result())
+ clo.result(new SingleRowResponse(new ByteBufferRow(getAndRemoveClosure.oldRow().valueBytes())));
else
- clo.result(new SingleRowResponse(oldRow));
+ clo.result(new SingleRowResponse(null));
}
else if (clo.command() instanceof GetAndReplaceCommand) {
BinaryRow row = ((GetAndReplaceCommand)clo.command()).getRow();
assert row != null && row.hasValue();
- BinaryRow oldRow = storage.get(extractAndWrapKey(row));
+ DataRow keyValue = extractAndWrapKeyValue(row);
- storage.computeIfPresent(extractAndWrapKey(row), (key, val) -> row);
+ var getAndReplace = new GetAndReplaceInvokeClosure(keyValue, true);
- if (oldRow == null || !oldRow.hasValue())
- clo.result(new SingleRowResponse(null));
- else
- clo.result(new SingleRowResponse(oldRow));
+ storage.invoke(keyValue, getAndReplace);
+
+ DataRow oldRow = getAndReplace.oldRow();
+
+ BinaryRow res = oldRow.hasValueBytes() ? new ByteBufferRow(oldRow.valueBytes()) : null;
+
+ clo.result(new SingleRowResponse(res));
}
else if (clo.command() instanceof GetAndUpsertCommand) {
BinaryRow row = ((GetAndUpsertCommand)clo.command()).getKeyRow();
assert row != null && row.hasValue();
- BinaryRow oldRow = storage.put(extractAndWrapKey(row), row);
+ DataRow keyValue = extractAndWrapKeyValue(row);
- if (oldRow == null || !oldRow.hasValue())
- clo.result(new SingleRowResponse(null));
+ var getAndReplace = new GetAndReplaceInvokeClosure(keyValue, false);
+
+ storage.invoke(keyValue, getAndReplace);
+
+ DataRow oldRow = getAndReplace.oldRow();
+
+ if (oldRow.hasValueBytes())
+ clo.result(new SingleRowResponse(new ByteBufferRow(oldRow.valueBytes())));
else
- clo.result(new SingleRowResponse(oldRow));
+ clo.result(new SingleRowResponse(null));
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
@@ -281,79 +312,37 @@ public class PartitionListener implements RaftGroupListener {
/** {@inheritDoc} */
@Override public void onShutdown() {
- // No-op.
- }
-
- /**
- * Wrapper provides correct byte[] comparison.
- */
- private static class KeyWrapper {
- /** Data. */
- private final byte[] data;
-
- /** Hash. */
- private final int hash;
-
- /**
- * Constructor.
- *
- * @param data Wrapped data.
- */
- KeyWrapper(byte[] data, int hash) {
- assert data != null;
-
- this.data = data;
- this.hash = hash;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- KeyWrapper wrapper = (KeyWrapper)o;
- return Arrays.equals(data, wrapper.data);
+ try {
+ storage.close();
}
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return hash;
+ catch (Exception e) {
+ throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
}
}
/**
- * Compares two rows.
+ * Extracts a key and a value from the {@link BinaryRow} and wraps it in a {@link DataRow}.
*
- * @param row Row to compare.
- * @param row2 Row to compare.
- * @return True if these rows is equivalent, false otherwise.
+ * @param row Binary row.
+ * @return Data row.
*/
- private boolean equalValues(BinaryRow row, BinaryRow row2) {
- if (row == row2)
- return true;
-
- if (row == null || row2 == null)
- return false;
-
- if (row.hasValue() ^ row2.hasValue())
- return false;
+ @NotNull private static DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
+ byte[] key = new byte[row.keySlice().capacity()];
+ row.keySlice().get(key);
- return row.valueSlice().compareTo(row2.valueSlice()) == 0;
+ return new SimpleDataRow(key, row.bytes());
}
/**
- * Makes a wrapped key from a table row.
+ * Extracts a key from the {@link BinaryRow} and wraps it in a {@link SearchRow}.
*
- * @param row Row.
- * @return Extracted key.
+ * @param row Binary row.
+ * @return Search row.
*/
- @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
- final byte[] bytes = new byte[row.keySlice().capacity()];
- row.keySlice().get(bytes);
+ @NotNull private static SearchRow extractAndWrapKey(@NotNull BinaryRow row) {
+ byte[] key = new byte[row.keySlice().capacity()];
+ row.keySlice().get(key);
- return new KeyWrapper(bytes, row.hash());
+ return new SimpleDataRow(key, null);
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index bb0d1b5..0676f11 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table;
import java.lang.reflect.Method;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -51,6 +52,8 @@ import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConver
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
@@ -91,7 +94,7 @@ import static org.mockito.Mockito.when;
/**
* Tests scenarios for table manager.
*/
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, WorkDirectoryExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public class TableManagerTest {
/** The logger. */
@@ -137,6 +140,9 @@ public class TableManagerTest {
@Mock(lenient = true)
private Loza rm;
+ @WorkDirectory
+ private Path workDir;
+
/** Test node. */
private final ClusterNode node = new ClusterNode(
UUID.randomUUID().toString(),
@@ -220,7 +226,7 @@ public class TableManagerTest {
@Disabled("https://issues.apache.org/jira/browse/IGNITE-14578")
@Test
public void testStaticTableConfigured() {
- TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm);
+ TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, workDir);
assertEquals(1, tableManager.tables().size());
@@ -453,7 +459,7 @@ public class TableManagerTest {
return null;
}).when(am).listen(same(AffinityEvent.CALCULATED), any());
- TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm);
+ TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, workDir);
TableImpl tbl2;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 5050f76..d861eea 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table.distributed.raft;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@@ -30,6 +32,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
@@ -47,11 +50,14 @@ import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -63,13 +69,17 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * There are a tests for a table command listener.
- * All rows should be removed before returning form each test.
+ * Tests for the table command listener.
*/
+@ExtendWith(WorkDirectoryExtension.class)
public class PartitionCommandListenerTest {
/** Key count. */
public static final int KEY_COUNT = 100;
+ /** Work directory. */
+ @WorkDirectory
+ private Path dataPath;
+
/** Schema. */
public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
1,
@@ -78,19 +88,18 @@ public class PartitionCommandListenerTest {
);
/** Table command listener. */
- private static PartitionListener commandListener;
+ private PartitionListener commandListener;
/**
* Initializes a table listener before tests.
*/
- @BeforeAll
- public static void before() {
- commandListener = new PartitionListener();
+ @BeforeEach
+ public void before() {
+ commandListener = new PartitionListener(new RocksDbStorage(dataPath.resolve("db"), ByteBuffer::compareTo));
}
/**
* Inserts rows and checks them.
- * All rows are removed before returning.
*/
@Test
public void testInsertCommands() {
@@ -109,7 +118,6 @@ public class PartitionCommandListenerTest {
/**
* Upserts rows and checks them.
- * All rows are removed before returning.
*/
@Test
public void testUpsertValues() {
@@ -126,7 +134,6 @@ public class PartitionCommandListenerTest {
/**
* Adds rows, replaces and checks them.
- * All rows are removed before returning.
*/
@Test
public void testReplaceCommand() {
@@ -149,7 +156,6 @@ public class PartitionCommandListenerTest {
/**
* The test checks PutIfExist command.
- * All rows are removed before returning.
*/
@Test
public void testPutIfExistCommand() {
@@ -172,7 +178,6 @@ public class PartitionCommandListenerTest {
/**
* The test checks GetAndReplace command.
- * All rows are removed before returning.
*/
@Test
public void testGetAndReplaceCommand() {
@@ -201,7 +206,6 @@ public class PartitionCommandListenerTest {
/**
* The test checks a batch upsert command.
- * All rows are removed before returning.
*/
@Test
public void testUpsertRowsBatchedAndCheck() {
@@ -220,7 +224,6 @@ public class PartitionCommandListenerTest {
/**
* The test checks a batch insert command.
- * All rows are removed before returning.
*/
@Test
public void testInsertRowsBatchedAndCheck() {