You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/01/10 11:24:27 UTC
[ignite-3] branch main updated: IGNITE-18180 RocksDB partition deletion operation completed (#1505)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 896acf64a9 IGNITE-18180 RocksDB partition deletion operation completed (#1505)
896acf64a9 is described below
commit 896acf64a99e45918ed1f91454cb84aeeb870aee
Author: Ivan Bessonov <be...@gmail.com>
AuthorDate: Tue Jan 10 14:24:21 2023 +0300
IGNITE-18180 RocksDB partition deletion operation completed (#1505)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 8 +-
.../apache/ignite/internal/rocksdb/RocksUtils.java | 2 +-
.../internal/storage/StorageClosedException.java | 27 +--
.../ignite/internal/storage/StorageException.java | 23 ++-
.../storage/AbstractMvPartitionStorageTest.java | 3 +-
.../storage/AbstractMvTableStorageTest.java | 11 +-
.../storage/impl/TestMvPartitionStorage.java | 2 +-
.../storage/index/impl/TestHashIndexStorage.java | 2 +-
.../storage/index/impl/TestSortedIndexStorage.java | 2 +-
.../index/hash/PageMemoryHashIndexStorage.java | 2 +-
.../index/sorted/PageMemorySortedIndexStorage.java | 2 +-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 2 +-
.../ignite/internal/storage/rocksdb/HashIndex.java | 20 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 220 +++++++++++++++------
.../storage/rocksdb/RocksDbTableStorage.java | 27 ++-
.../internal/storage/rocksdb/SortedIndex.java | 19 +-
.../rocksdb/index/RocksDbHashIndexStorage.java | 65 +++++-
.../rocksdb/index/RocksDbSortedIndexStorage.java | 139 ++++++++++---
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 12 --
.../state/rocksdb/TxStateRocksDbStorage.java | 7 +-
20 files changed, 445 insertions(+), 150 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index cdf5b0b696..514889e335 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -323,8 +323,14 @@ public class ErrorGroups {
/** Storage error group. */
public static final ErrorGroup STORAGE_ERR_GROUP = ErrorGroup.newGroup("STORAGE", 9);
+ /** Default error code when nothing else is specified. */
+ public static final int GENERIC_ERR = STORAGE_ERR_GROUP.registerErrorCode(1);
+
/** Failed to create a directory. */
- public static final int DIRECTORY_CREATION_ERR = STORAGE_ERR_GROUP.registerErrorCode(1);
+ public static final int DIRECTORY_CREATION_ERR = STORAGE_ERR_GROUP.registerErrorCode(2);
+
+ /** Operation on closed storage. */
+ public static final int ALREADY_CLOSED_ERR = STORAGE_ERR_GROUP.registerErrorCode(3);
}
/** Distribution zones error group. */
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
index d6fa91a3f3..1b12b3085a 100644
--- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -132,7 +132,7 @@ public class RocksUtils {
*
* @param references References to close.
*/
- public static void closeAll(Collection<AbstractNativeReference> references) {
+ public static void closeAll(Collection<? extends AbstractNativeReference> references) {
RuntimeException exception = null;
for (AbstractNativeReference reference : references) {
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
index 6abccd774c..6bbd058587 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
@@ -17,35 +17,18 @@
package org.apache.ignite.internal.storage;
+import org.apache.ignite.lang.ErrorGroups.Storage;
+
/**
* Exception that will be thrown when the storage is closed.
*/
public class StorageClosedException extends StorageException {
- /**
- * Constructor.
- *
- * @param message Error message.
- */
- public StorageClosedException(String message) {
- super(message);
- }
-
- /**
- * Constructor.
- *
- * @param message Error message.
- * @param cause The cause.
- */
- public StorageClosedException(String message, Throwable cause) {
- super(message, cause);
- }
+ private static final long serialVersionUID = -7988332521347221109L;
/**
* Constructor.
- *
- * @param cause The cause.
*/
- public StorageClosedException(Throwable cause) {
- super(cause);
+ public StorageClosedException() {
+ super(Storage.ALREADY_CLOSED_ERR, "Storage is already closed");
}
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
index a8e227bf52..7f19415bb1 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -17,17 +17,22 @@
package org.apache.ignite.internal.storage;
+import org.apache.ignite.lang.ErrorGroups.Storage;
+import org.apache.ignite.lang.IgniteInternalException;
+
/**
* Exception thrown by the storage.
*/
-public class StorageException extends RuntimeException {
+public class StorageException extends IgniteInternalException {
+ private static final long serialVersionUID = 8705275268121031742L;
+
/**
* Constructor.
*
* @param message Error message.
*/
public StorageException(String message) {
- super(message);
+ super(Storage.GENERIC_ERR, message);
}
/**
@@ -37,7 +42,7 @@ public class StorageException extends RuntimeException {
* @param cause The cause.
*/
public StorageException(String message, Throwable cause) {
- super(message, cause);
+ super(Storage.GENERIC_ERR, message, cause);
}
/**
@@ -46,6 +51,16 @@ public class StorageException extends RuntimeException {
* @param cause The cause.
*/
public StorageException(Throwable cause) {
- super(cause);
+ super(Storage.GENERIC_ERR, cause);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param code Full error code.
+ * @param message Error message.
+ */
+ protected StorageException(int code, String message) {
+ super(code, message);
}
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 1c7bf67cdd..7e52388a0d 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -21,6 +21,7 @@ import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -1144,7 +1145,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor
RowId rowId = insert(binaryRow, txId);
StorageException ex = assertThrows(StorageException.class, () -> addWriteCommitted(rowId, binaryRow2, clock.now()));
- assertThat(ex.getMessage(), is("Write intent exists for " + rowId));
+ assertThat(ex.getMessage(), containsString("Write intent exists for " + rowId));
}
@Test
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 9c7789baa1..324b4dae42 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -24,6 +24,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -276,14 +277,14 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
() -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id())
);
- assertThat(e.getMessage(), is("Partition ID " + PARTITION_ID + " does not exist"));
+ assertThat(e.getMessage(), containsString("Partition ID " + PARTITION_ID + " does not exist"));
e = assertThrows(
StorageException.class,
() -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id())
);
- assertThat(e.getMessage(), is("Partition ID " + PARTITION_ID + " does not exist"));
+ assertThat(e.getMessage(), containsString("Partition ID " + PARTITION_ID + " does not exist"));
tableStorage.getOrCreateMvPartition(PARTITION_ID);
@@ -294,7 +295,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
() -> tableStorage.getOrCreateHashIndex(PARTITION_ID, invalidUuid)
);
- assertThat(e.getMessage(), is(String.format("Index configuration for \"%s\" could not be found", invalidUuid)));
+ assertThat(e.getMessage(), containsString(String.format("Index configuration for \"%s\" could not be found", invalidUuid)));
e = assertThrows(
StorageException.class,
@@ -303,7 +304,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
assertThat(
e.getMessage(),
- is(String.format("Index \"%s\" is not configured as a Hash Index. Actual type: SORTED", sortedIdx.id()))
+ containsString(String.format("Index \"%s\" is not configured as a Hash Index. Actual type: SORTED", sortedIdx.id()))
);
e = assertThrows(
@@ -313,7 +314,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
assertThat(
e.getMessage(),
- is(String.format("Index \"%s\" is not configured as a Sorted Index. Actual type: HASH", hashIdx.id()))
+ containsString(String.format("Index \"%s\" is not configured as a Sorted Index. Actual type: HASH", hashIdx.id()))
);
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index b379217843..6cc5214bb7 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -557,7 +557,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
private void checkStorageClosed() {
if (closed) {
- throw new StorageClosedException("Storage is already closed");
+ throw new StorageClosedException();
}
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index 4bc9f6b2d9..d66773b94e 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -143,7 +143,7 @@ public class TestHashIndexStorage implements HashIndexStorage {
private void checkStorageClosed() {
if (closed) {
- throw new StorageClosedException("Storage is already closed");
+ throw new StorageClosedException();
}
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index fa7f567c7c..ba93ff06d1 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -317,7 +317,7 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
private void checkStorageClosed() {
if (closed) {
- throw new StorageClosedException("Storage is already closed");
+ throw new StorageClosedException();
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index ef2e22f1b6..41b3e768e1 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -216,6 +216,6 @@ public class PageMemoryHashIndexStorage implements HashIndexStorage {
* Throws an exception that the storage is already closed.
*/
private void throwStorageClosedException() {
- throw new StorageClosedException("Storage is already closed");
+ throw new StorageClosedException();
}
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index c29dab554c..9d9180f1ed 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -228,7 +228,7 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage {
* Throws an exception that the storage is already closed.
*/
private void throwStorageClosedException() {
- throw new StorageClosedException("Storage is already closed");
+ throw new StorageClosedException();
}
/**
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index cd24c83611..c516e31ff4 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -1005,6 +1005,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
* Throws an exception that the storage is already closed.
*/
protected void throwStorageClosedException() {
- throw new StorageClosedException("Storage is already closed");
+ throw new StorageClosedException();
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index 5a61b7c992..f055bc6bcc 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
/**
* Class that represents a Hash Index defined for all partitions of a Table.
@@ -32,7 +34,7 @@ class HashIndex {
private final HashIndexDescriptor descriptor;
- private final ConcurrentMap<Integer, HashIndexStorage> storages = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, RocksDbHashIndexStorage> storages = new ConcurrentHashMap<>();
HashIndex(ColumnFamily indexCf, HashIndexDescriptor descriptor) {
this.indexCf = indexCf;
@@ -55,4 +57,20 @@ class HashIndex {
void destroy() {
storages.forEach((partitionId, storage) -> storage.destroy());
}
+
+ /**
+ * Deletes the data associated with the partition in the index, using passed write batch for the operation.
+ * Index storage instance is closed after this method, if it ever existed.
+ *
+ * @throws RocksDBException If failed to delete data.
+ */
+ void destroy(int partitionId, WriteBatch writeBatch) throws RocksDBException {
+ RocksDbHashIndexStorage hashIndex = storages.remove(partitionId);
+
+ if (hashIndex != null) {
+ hashIndex.close();
+
+ hashIndex.destroyData(writeBatch);
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index a609aa3719..3523066ce6 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -35,22 +35,25 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
@@ -134,14 +137,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Thread-local direct buffer instance to read keys from RocksDB. */
private static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
- /** Thread-local write batch for {@link #runConsistently(WriteClosure)}. */
- private static final ThreadLocal<WriteBatchWithIndex> WRITE_BATCH = new ThreadLocal<>();
-
/** Thread-local on-heap byte buffer instance to use for key manipulations. */
private static final ThreadLocal<ByteBuffer> HEAP_KEY_BUFFER = withInitial(
() -> ByteBuffer.allocate(MAX_KEY_SIZE).order(KEY_BYTE_ORDER)
);
+ /** Thread-local write batch for {@link #runConsistently(WriteClosure)}. */
+ private final ThreadLocal<WriteBatchWithIndex> threadLocalWriteBatch = new ThreadLocal<>();
+
/** Table storage instance. */
private final RocksDbTableStorage tableStorage;
@@ -173,6 +176,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Upper bound for scans and reads. */
private final Slice upperBound;
+ private final ReadOptions upperBoundReadOpts;
+
/** Read options for scan iterators. */
private final ReadOptions scanReadOptions;
@@ -205,6 +210,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
private volatile long persistedIndex;
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* Constructor.
*
@@ -220,6 +231,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
upperBound = new Slice(partitionEndPrefix());
+ upperBoundReadOpts = new ReadOptions().setIterateUpperBound(upperBound);
scanReadOptions = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
lastAppliedIndexKey = ("index" + partitionId).getBytes(StandardCharsets.UTF_8);
@@ -236,11 +248,15 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
- if (WRITE_BATCH.get() != null) {
+ if (threadLocalWriteBatch.get() != null) {
return closure.execute();
} else {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
try (var writeBatch = new WriteBatchWithIndex()) {
- WRITE_BATCH.set(writeBatch);
+ threadLocalWriteBatch.set(writeBatch);
pendingAppliedIndex = lastAppliedIndex;
pendingAppliedTerm = lastAppliedTerm;
@@ -249,7 +265,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
V res = closure.execute();
try {
- db.write(writeOpts, writeBatch);
+ if (writeBatch.count() > 0) {
+ db.write(writeOpts, writeBatch);
+ }
} catch (RocksDBException e) {
throw new StorageException("Unable to apply a write batch to RocksDB instance.", e);
}
@@ -260,7 +278,9 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return res;
} finally {
- WRITE_BATCH.set(null);
+ threadLocalWriteBatch.set(null);
+
+ busyLock.leaveBusy();
}
}
}
@@ -268,7 +288,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> flush() {
- return tableStorage.awaitFlush(true);
+ return busy(() -> tableStorage.awaitFlush(true));
}
/**
@@ -286,20 +306,22 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return requireWriteBatch();
}
- /** {@inheritDoc} */
@Override
public long lastAppliedIndex() {
- return WRITE_BATCH.get() == null ? lastAppliedIndex : pendingAppliedIndex;
+ return busy(() -> threadLocalWriteBatch.get() == null ? lastAppliedIndex : pendingAppliedIndex);
}
@Override
public long lastAppliedTerm() {
- return WRITE_BATCH.get() == null ? lastAppliedTerm : pendingAppliedTerm;
+ return busy(() -> threadLocalWriteBatch.get() == null ? lastAppliedTerm : pendingAppliedTerm);
}
- /** {@inheritDoc} */
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
+ busy(() -> lastAppliedBusy(lastAppliedIndex, lastAppliedTerm));
+ }
+
+ private Void lastAppliedBusy(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
WriteBatchWithIndex writeBatch = requireWriteBatch();
try {
@@ -308,31 +330,38 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
pendingAppliedIndex = lastAppliedIndex;
pendingAppliedTerm = lastAppliedTerm;
+
+ return null;
} catch (RocksDBException e) {
throw new StorageException(e);
}
}
- /** {@inheritDoc} */
@Override
public long persistedIndex() {
- return persistedIndex;
+ return busy(() -> persistedIndex);
}
@Override
@Nullable
public RaftGroupConfiguration committedGroupConfiguration() {
- return WRITE_BATCH.get() == null ? lastGroupConfig : pendingGroupConfig;
+ return busy(() -> threadLocalWriteBatch.get() == null ? lastGroupConfig : pendingGroupConfig);
}
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
+ busy(() -> committedGroupConfigurationBusy(config));
+ }
+
+ private Void committedGroupConfigurationBusy(RaftGroupConfiguration config) {
WriteBatchWithIndex writeBatch = requireWriteBatch();
try {
writeBatch.put(meta, lastGroupConfigKey, ByteUtils.toBytes(config));
pendingGroupConfig = config;
+
+ return null;
} catch (RocksDBException e) {
throw new StorageException(e);
}
@@ -345,7 +374,16 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* @throws StorageException If failed to read index from the storage.
*/
public void refreshPersistedIndex() throws StorageException {
- persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
+ if (!busyLock.enterBusy()) {
+ // Don't throw the exception, there's no point in that.
+ return;
+ }
+
+ try {
+ persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -398,10 +436,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return bytes == null ? null : fromBytes(bytes);
}
- /** {@inheritDoc} */
@Override
public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
throws TxIdMismatchException, StorageException {
+ return busy(() -> addWriteBusy(rowId, row, txId, commitTableId, commitPartitionId));
+ }
+
+ private @Nullable BinaryRow addWriteBusy(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
+ throws TxIdMismatchException, StorageException {
@SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
@@ -479,9 +521,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return row.bytes();
}
- /** {@inheritDoc} */
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
+ return busy(() -> abortWriteBusy(rowId));
+ }
+
+ private @Nullable BinaryRow abortWriteBusy(RowId rowId) throws StorageException {
WriteBatchWithIndex writeBatch = requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
@@ -505,9 +550,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
- /** {@inheritDoc} */
@Override
public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+ busy(() -> commitWriteBusy(rowId, timestamp));
+ }
+
+ private Void commitWriteBusy(RowId rowId, HybridTimestamp timestamp) throws StorageException {
WriteBatchWithIndex writeBatch = requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
@@ -520,7 +568,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (valueBytes == null) {
//the chain doesn't contain an uncommitted write intent
- return;
+ return null;
}
// Delete pending write.
@@ -530,6 +578,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
putTimestamp(keyBuf, timestamp);
writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
+
+ return null;
} catch (RocksDBException e) {
throw new StorageException("Failed to commit row into storage", e);
}
@@ -537,6 +587,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
+ busy(() -> addWriteCommittedBusy(rowId, row, commitTimestamp));
+ }
+
+ private Void addWriteCommittedBusy(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
@SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
@@ -547,26 +601,30 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
try {
writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes);
+
+ return null;
} catch (RocksDBException e) {
throw new StorageException("Failed to update a row in storage", e);
}
}
- /** {@inheritDoc} */
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
+ return busy(() -> readBusy(rowId, timestamp));
+ }
+
+ private ReadResult readBusy(RowId rowId, HybridTimestamp timestamp) throws StorageException {
if (rowId.partitionId() != partitionId) {
throw new IllegalArgumentException(
String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
}
// We can read data outside of consistency closure. Batch is not required.
- WriteBatchWithIndex writeBatch = WRITE_BATCH.get();
+ WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
try (
// Set next partition as an upper bound.
- var readOpts1 = new ReadOptions().setIterateUpperBound(upperBound);
- RocksIterator baseIterator = db.newIterator(cf, readOpts1);
+ RocksIterator baseIterator = db.newIterator(cf, upperBoundReadOpts);
// "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
// It's not documented, but this is exactly how it should be used.
RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
@@ -581,7 +639,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
- private boolean lookingForLatestVersions(HybridTimestamp timestamp) {
+ private static boolean lookingForLatestVersions(HybridTimestamp timestamp) {
return timestamp == HybridTimestamp.MAX_VALUE;
}
@@ -659,7 +717,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* @param keyBuf Buffer with a key in it: partition id + row id + timestamp.
* @return Read result.
*/
- private ReadResult handleReadByTimestampIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp,
+ private static ReadResult handleReadByTimestampIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp,
ByteBuffer keyBuf) {
// There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key.
// To avoid returning its value, we have to check that actual key matches what we need.
@@ -771,6 +829,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
+ return busy(() -> scanVersionsBusy(rowId));
+ }
+
+ private Cursor<ReadResult> scanVersionsBusy(RowId rowId) throws StorageException {
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
@@ -783,9 +845,15 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
RocksIterator it = db.newIterator(cf, options);
+ WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
+
+ if (writeBatch != null && writeBatch.count() > 0) {
+ it = writeBatch.newIteratorWithBase(cf, it);
+ }
+
it.seek(lowerBound);
- return new RocksIteratorAdapter<>(it) {
+ return new BusyRocksIteratorAdapter<>(busyLock, it) {
@Override
protected ReadResult decodeEntry(byte[] key, byte[] value) {
int keyLength = key.length;
@@ -795,6 +863,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
}
+ @Override
+ protected void handleBusyFail() {
+ throw new StorageClosedException();
+ }
+
@Override
public void close() {
super.close();
@@ -810,11 +883,13 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
Objects.requireNonNull(timestamp, "timestamp is null");
- if (lookingForLatestVersions(timestamp)) {
- return new ScanLatestVersionsCursor();
- } else {
- return new ScanByTimestampCursor(timestamp);
- }
+ return busy(() -> {
+ if (lookingForLatestVersions(timestamp)) {
+ return new ScanLatestVersionsCursor();
+ } else {
+ return new ScanByTimestampCursor(timestamp);
+ }
+ });
}
private void setKeyBuffer(ByteBuffer keyBuf, RowId rowId, @Nullable HybridTimestamp timestamp) {
@@ -830,6 +905,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
+ return busy(() -> closestRowIdBusy(lowerBound));
+ }
+
+ private @Nullable RowId closestRowIdBusy(RowId lowerBound) throws StorageException {
ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
@@ -851,7 +930,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
- private void incrementRowId(ByteBuffer buf) {
+ private static void incrementRowId(ByteBuffer buf) {
long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb);
@@ -885,6 +964,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public long rowsCount() {
+ return busy(this::rowsCountBusy);
+ }
+
+ private long rowsCountBusy() {
try (
var upperBound = new Slice(partitionEndPrefix());
var options = new ReadOptions().setIterateUpperBound(upperBound);
@@ -904,34 +987,32 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
/**
- * Deletes partition data from the storage.
+ * Deletes partition data from the storage, using write batch to perform the operation.
*/
- public void destroy() {
- try (WriteBatch writeBatch = new WriteBatch()) {
- writeBatch.delete(meta, lastAppliedIndexKey);
- writeBatch.delete(meta, lastAppliedTermKey);
- writeBatch.delete(meta, lastGroupConfigKey);
-
- writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
+ writeBatch.delete(meta, lastAppliedIndexKey);
+ writeBatch.delete(meta, lastAppliedTermKey);
+ writeBatch.delete(meta, lastGroupConfigKey);
- writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
+ writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
- db.write(writeOpts, writeBatch);
- } catch (RocksDBException e) {
- TableConfiguration tableCfg = tableStorage.configuration();
-
- throw new StorageException("Failed to destroy partition " + partitionId + " of table " + tableCfg.name(), e);
- }
+ writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
}
/** {@inheritDoc} */
@Override
public void close() {
- RocksUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts, scanReadOptions, upperBound);
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ RocksUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts, scanReadOptions, upperBoundReadOpts, upperBound);
}
- private static WriteBatchWithIndex requireWriteBatch() {
- WriteBatchWithIndex writeBatch = WRITE_BATCH.get();
+ private WriteBatchWithIndex requireWriteBatch() {
+ WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
if (writeBatch == null) {
throw new StorageException("Attempting to write data outside of data access closure.");
@@ -1118,8 +1199,19 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */
protected ReadResult next;
+ protected abstract boolean hasNextBusy();
+
+ @Override
+ public boolean hasNext() {
+ return busy(this::hasNextBusy);
+ }
+
@Override
public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+ return busy(() -> committedBusy(timestamp));
+ }
+
+ private @Nullable BinaryRow committedBusy(HybridTimestamp timestamp) {
Objects.requireNonNull(timestamp, "timestamp is null");
if (currentRowId == null) {
@@ -1141,7 +1233,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public final ReadResult next() {
- if (!hasNext()) {
+ return busy(this::nextBusy);
+ }
+
+ private ReadResult nextBusy() {
+ if (!hasNextBusy()) {
throw new NoSuchElementException();
}
@@ -1160,7 +1256,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
private final class ScanLatestVersionsCursor extends BasePartitionTimestampCursor {
@Override
- public boolean hasNext() {
+ public boolean hasNextBusy() {
// Fast-path for consecutive invocations.
if (next != null) {
return true;
@@ -1278,7 +1374,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
@Override
- public boolean hasNext() {
+ public boolean hasNextBusy() {
// Fast-path for consecutive invocations.
if (next != null) {
return true;
@@ -1328,4 +1424,16 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
}
}
+
+ private <V> V busy(Supplier<V> supplier) {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
+ try {
+ return supplier.get();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index f0c1c948bc..66ab805e79 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -68,6 +68,7 @@ import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
/**
@@ -410,16 +411,32 @@ public class RocksDbTableStorage implements MvTableStorage {
RocksDbMvPartitionStorage mvPartition = partitions.getAndSet(partitionId, null);
if (mvPartition != null) {
- try {
- //TODO IGNITE-17626 Destroy indexes as well...
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ mvPartition.close();
// Operation to delete partition data should be fast, since we will write only the range of keys for deletion, and the
// RocksDB itself will then destroy the data on flash.
- mvPartition.destroy();
+ mvPartition.destroyData(writeBatch);
- mvPartition.close();
+ for (HashIndex hashIndex : hashIndices.values()) {
+ hashIndex.destroy(partitionId, writeBatch);
+ }
+
+ for (SortedIndex sortedIndex : sortedIndices.values()) {
+ sortedIndex.destroy(partitionId, writeBatch);
+ }
+
+ db.write(writeOptions, writeBatch);
- partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+ CompletableFuture<?> flushFuture = awaitFlush(true);
+
+ flushFuture.whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+ } else {
+ partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+ }
+ });
} catch (Throwable throwable) {
partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index 50d4e12103..a6dfd44671 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
/**
* Class that represents a Sorted Index defined for all partitions of a Table.
@@ -35,7 +36,7 @@ class SortedIndex implements ManuallyCloseable {
private final ColumnFamily indexCf;
- private final ConcurrentMap<Integer, SortedIndexStorage> storages = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, RocksDbSortedIndexStorage> storages = new ConcurrentHashMap<>();
SortedIndex(ColumnFamily indexCf, SortedIndexDescriptor descriptor) {
this.descriptor = descriptor;
@@ -70,6 +71,22 @@ class SortedIndex implements ManuallyCloseable {
}
}
+ /**
+ * Deletes the data associated with the partition in the index, using passed write batch for the operation.
+ * Index storage instance is closed after this method, if it ever existed.
+ *
+ * @throws RocksDBException If failed to delete data.
+ */
+ void destroy(int partitionId, WriteBatch writeBatch) throws RocksDBException {
+ RocksDbSortedIndexStorage sortedIndex = storages.remove(partitionId);
+
+ if (sortedIndex != null) {
+ sortedIndex.close();
+
+ sortedIndex.destroyData(writeBatch);
+ }
+ }
+
@Override
public void close() {
indexCf.handle().close();
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index 0a3cba372f..ce14828c2c 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -24,11 +24,13 @@ import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -36,10 +38,12 @@ import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.HashUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
import org.rocksdb.WriteOptions;
@@ -74,6 +78,12 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
*/
private final byte[] constantPrefix;
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* Creates a new Hash Index storage.
*
@@ -107,6 +117,10 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
byte[] rangeStart = rocksPrefix(key);
byte[] rangeEnd = incrementArray(rangeStart);
@@ -119,7 +133,14 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
it.seek(rangeStart);
- return new RocksIteratorAdapter<>(it) {
+ busyLock.leaveBusy();
+
+ return new BusyRocksIteratorAdapter<>(busyLock, it) {
+ @Override
+ protected void handleBusyFail() {
+ throw new StorageClosedException();
+ }
+
@Override
protected RowId decodeEntry(byte[] key, byte[] value) {
// RowId UUID is located at the last 16 bytes of the key
@@ -140,23 +161,35 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
@Override
public void put(IndexRow row) {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
} catch (RocksDBException e) {
throw new StorageException("Unable to insert data into hash index. Index ID: " + descriptor.id(), e);
+ } finally {
+ busyLock.leaveBusy();
}
}
@Override
public void remove(IndexRow row) {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
writeBatch.delete(indexCf.handle(), rocksKey(row));
} catch (RocksDBException e) {
throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
+ } finally {
+ busyLock.leaveBusy();
}
}
@@ -196,4 +229,28 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
.putLong(rowId.leastSignificantBits())
.array();
}
+
+ /**
+ * Closes the hash index storage.
+ */
+ public void close() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+ }
+
+ /**
+ * Deletes the data associated with the index, using passed write batch for the operation.
+ *
+ * @throws RocksDBException If failed to delete data.
+ */
+ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
+ byte[] rangeEnd = incrementArray(constantPrefix);
+
+ assert rangeEnd != null;
+
+ writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd);
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index d28f2f41f9..99fc1b84dc 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.storage.rocksdb.index;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementArray;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
@@ -30,6 +32,7 @@ import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -38,11 +41,13 @@ import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
/**
@@ -68,6 +73,12 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
private final RocksDbMvPartitionStorage partitionStorage;
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* Creates a storage.
*
@@ -92,39 +103,67 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
- return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
+ try {
+ return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
@Override
public void put(IndexRow row) {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
} catch (RocksDBException e) {
throw new StorageException("Unable to insert data into sorted index. Index ID: " + descriptor.id(), e);
+ } finally {
+ busyLock.leaveBusy();
}
}
@Override
public void remove(IndexRow row) {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+
writeBatch.delete(indexCf.handle(), rocksKey(row));
} catch (RocksDBException e) {
throw new StorageException("Unable to remove data from sorted index. Index ID: " + descriptor.id(), e);
+ } finally {
+ busyLock.leaveBusy();
}
}
@Override
public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
- return scan(lowerBound, upperBound, includeLower, includeUpper, this::decodeRow);
+ try {
+ return scan(lowerBound, upperBound, includeLower, includeUpper, this::decodeRow);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
private <T> PeekCursor<T> scan(
@@ -191,44 +230,68 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public boolean hasNext() {
- advanceIfNeeded();
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
+ try {
+ advanceIfNeeded();
- return hasNext;
+ return hasNext;
+ } finally {
+ busyLock.leaveBusy();
+ }
}
@Override
public T next() {
- advanceIfNeeded();
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
- boolean hasNext = this.hasNext;
+ try {
+ advanceIfNeeded();
- if (!hasNext) {
- throw new NoSuchElementException();
- }
+ boolean hasNext = this.hasNext;
+
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
- this.hasNext = null;
+ this.hasNext = null;
- return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
+ return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
+ } finally {
+ busyLock.leaveBusy();
+ }
}
@Override
public @Nullable T peek() {
- if (hasNext != null) {
- if (hasNext) {
- return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
- }
-
- return null;
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
}
- refreshAndPrepareRocksIterator();
+ try {
+ if (hasNext != null) {
+ if (hasNext) {
+ return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
+ }
- if (!it.isValid()) {
- RocksUtils.checkIterator(it);
+ return null;
+ }
- return null;
- } else {
- return mapper.apply(ByteBuffer.wrap(it.key()).order(ORDER));
+ refreshAndPrepareRocksIterator();
+
+ if (!it.isValid()) {
+ RocksUtils.checkIterator(it);
+
+ return null;
+ } else {
+ return mapper.apply(ByteBuffer.wrap(it.key()).order(ORDER));
+ }
+ } finally {
+ busyLock.leaveBusy();
}
}
@@ -328,4 +391,30 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
.slice()
.order(ByteOrder.LITTLE_ENDIAN);
}
+
+ /**
+ * Closes the sorted index storage.
+ */
+ public void close() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+ }
+
+ /**
+ * Deletes the data associated with the index, using passed write batch for the operation.
+ *
+ * @throws RocksDBException If failed to delete data.
+ */
+ public void destroyData(WriteBatch writeBatch) throws RocksDBException {
+ byte[] constantPrefix = ByteBuffer.allocate(2).order(ByteOrder.BIG_ENDIAN).putShort((short) partitionStorage.partitionId()).array();
+
+ byte[] rangeEnd = incrementArray(constantPrefix);
+
+ assert rangeEnd != null;
+
+ writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd);
+ }
}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index fe2ca44c67..e857c368a6 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -153,18 +153,6 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
assertThat(tableStorage.isVolatile(), is(false));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180")
- @Override
- public void testDestroyPartition() throws Exception {
- super.testDestroyPartition();
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180")
- @Override
- public void testReCreatePartition() throws Exception {
- super.testReCreatePartition();
- }
-
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
@Override
public void testSuccessRebalance() throws Exception {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index e5be339094..aa34d1d7a8 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -30,8 +30,6 @@ import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -48,7 +46,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;
-import org.rocksdb.AbstractNativeReference;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -655,9 +652,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
busyLock.block();
- List<AbstractNativeReference> resources = new ArrayList<>(iterators);
-
- RocksUtils.closeAll(resources);
+ RocksUtils.closeAll(iterators);
iterators.clear();