You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/19 13:18:36 UTC
[ignite-3] branch main updated: IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver (#1530)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2cb222a716 IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver (#1530)
2cb222a716 is described below
commit 2cb222a71685c500558844084f339a4af71d093a
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Jan 19 16:18:29 2023 +0300
IGNITE-18027 Implementation of a full rebalance for RocksDbMvPartitionStorage on receiver (#1530)
---
.../internal/storage/impl/TestMvTableStorage.java | 15 +-
.../ignite/internal/storage/rocksdb/HashIndex.java | 10 ++
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 171 ++++++++++++++++++---
.../storage/rocksdb/RocksDbTableStorage.java | 154 ++++++++++++++++---
.../internal/storage/rocksdb/SortedIndex.java | 10 ++
.../rocksdb/index/RocksDbHashIndexStorage.java | 110 +++++++++++--
.../rocksdb/index/RocksDbSortedIndexStorage.java | 87 ++++++++++-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 21 ---
8 files changed, 482 insertions(+), 96 deletions(-)
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 45e59cfa97..dac39a6201 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.impl;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
import java.util.Map;
import java.util.Objects;
@@ -163,7 +164,7 @@ public class TestMvTableStorage implements MvTableStorage {
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
if (!partitions.containsKey(partitionId)) {
- throw new StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
+ throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
}
SortedIndices sortedIndices = sortedIndicesById.computeIfAbsent(
@@ -177,7 +178,7 @@ public class TestMvTableStorage implements MvTableStorage {
@Override
public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
if (!partitions.containsKey(partitionId)) {
- throw new StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
+ throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
}
HashIndices sortedIndices = hashIndicesById.computeIfAbsent(
@@ -249,7 +250,7 @@ public class TestMvTableStorage implements MvTableStorage {
TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
if (partitionStorage == null) {
- throw new StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+ throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
}
assert !destroyFutureByPartitionId.containsKey(partitionId) : partitionId;
@@ -292,7 +293,7 @@ public class TestMvTableStorage implements MvTableStorage {
TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
if (partitionStorage == null) {
- throw new StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+ throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
}
return rebalanceFuture
@@ -318,7 +319,7 @@ public class TestMvTableStorage implements MvTableStorage {
TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
if (partitionStorage == null) {
- throw new StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+ throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
}
return rebalanceFuture
@@ -344,10 +345,6 @@ public class TestMvTableStorage implements MvTableStorage {
}
}
- private static String createPartitionDoesNotExistsErrorMessage(int partitionId) {
- return "Partition ID " + partitionId + " does not exist";
- }
-
private Stream<TestHashIndexStorage> testHashIndexStorageStream(Integer partitionId) {
return hashIndicesById.values().stream()
.map(hashIndices -> hashIndices.storageByPartitionId.get(partitionId))
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index f055bc6bcc..1d15af522f 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
+import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@@ -73,4 +74,13 @@ class HashIndex {
hashIndex.destroyData(writeBatch);
}
}
+
+ /**
+ * Returns hash index storage for partition.
+ *
+ * @param partitionId Partition ID.
+ */
+ @Nullable RocksDbHashIndexStorage get(int partitionId) {
+ return storages.get(partitionId);
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 2dbc8e4991..ccd6a421c5 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -22,9 +22,14 @@ import static java.nio.ByteBuffer.allocateDirect;
import static java.util.Arrays.copyOf;
import static java.util.Arrays.copyOfRange;
import static org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.partitionIdKey;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.bytesToUuid;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.ByteUtils.putUuidToBytes;
import static org.rocksdb.ReadTier.PERSISTED_TIER;
@@ -35,10 +40,10 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -47,14 +52,17 @@ import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;
+import org.rocksdb.AbstractWriteBatch;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@@ -210,11 +218,11 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
private volatile long persistedIndex;
- /** Busy lock to stop synchronously. */
+ /** Busy lock. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- /** Prevents double stopping the component. */
- private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** Current state of the storage. */
+ private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
/**
* Constructor.
@@ -245,7 +253,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
persistedIndex = lastAppliedIndex;
}
- /** {@inheritDoc} */
@Override
public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
if (threadLocalWriteBatch.get() != null) {
@@ -314,14 +321,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
busy(() -> {
- WriteBatchWithIndex writeBatch = requireWriteBatch();
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
try {
- writeBatch.put(meta, lastAppliedIndexKey, ByteUtils.longToBytes(lastAppliedIndex));
- writeBatch.put(meta, lastAppliedTermKey, ByteUtils.longToBytes(lastAppliedTerm));
-
- pendingAppliedIndex = lastAppliedIndex;
- pendingAppliedTerm = lastAppliedTerm;
+ saveLastApplied(requireWriteBatch(), lastAppliedIndex, lastAppliedTerm);
return null;
} catch (RocksDBException e) {
@@ -330,6 +333,14 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
});
}
+ private void saveLastApplied(AbstractWriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ writeBatch.put(meta, lastAppliedIndexKey, longToBytes(lastAppliedIndex));
+ writeBatch.put(meta, lastAppliedTermKey, longToBytes(lastAppliedTerm));
+
+ pendingAppliedIndex = lastAppliedIndex;
+ pendingAppliedTerm = lastAppliedTerm;
+ }
+
@Override
public long persistedIndex() {
return busy(() -> persistedIndex);
@@ -344,6 +355,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
WriteBatchWithIndex writeBatch = requireWriteBatch();
try {
@@ -512,6 +525,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
WriteBatchWithIndex writeBatch = requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
@@ -593,6 +608,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
if (rowId.partitionId() != partitionId) {
throw new IllegalArgumentException(
String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
@@ -810,6 +827,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
@@ -830,7 +849,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
it.seek(lowerBound);
- return new BusyRocksIteratorAdapter<ReadResult>(busyLock, it) {
+ return new RocksIteratorAdapter<ReadResult>(it) {
@Override
protected ReadResult decodeEntry(byte[] key, byte[] value) {
int keyLength = key.length;
@@ -841,8 +860,21 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
}
@Override
- protected void handleBusyFail() {
- throw new StorageClosedException();
+ public boolean hasNext() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
+ return super.hasNext();
+ });
+ }
+
+ @Override
+ public ReadResult next() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
+ return super.next();
+ });
}
@Override
@@ -861,6 +893,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
Objects.requireNonNull(timestamp, "timestamp is null");
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
if (lookingForLatestVersions(timestamp)) {
return new ScanLatestVersionsCursor();
} else {
@@ -883,6 +917,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
@@ -940,6 +976,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public long rowsCount() {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
try (
var upperBound = new Slice(partitionEndPrefix());
var options = new ReadOptions().setIterateUpperBound(upperBound);
@@ -967,15 +1005,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
writeBatch.delete(meta, lastAppliedTermKey);
writeBatch.delete(meta, lastGroupConfigKey);
- writeBatch.delete(meta, RocksDbMetaStorage.partitionIdKey(partitionId));
+ writeBatch.delete(meta, partitionIdKey(partitionId));
writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
}
- /** {@inheritDoc} */
@Override
public void close() {
- if (!stopGuard.compareAndSet(false, true)) {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+ StorageState state = this.state.get();
+
+ assert state == StorageState.CLOSED : state;
+
return;
}
@@ -1176,12 +1217,18 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public boolean hasNext() {
- return busy(this::hasNextBusy);
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
+ return hasNextBusy();
+ });
}
@Override
public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
Objects.requireNonNull(timestamp, "timestamp is null");
if (currentRowId == null) {
@@ -1205,6 +1252,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public final ReadResult next() {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+
if (!hasNextBusy()) {
throw new NoSuchElementException();
}
@@ -1396,7 +1445,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
private <V> V busy(Supplier<V> supplier) {
if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
}
try {
@@ -1405,4 +1454,84 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
busyLock.leaveBusy();
}
}
+
+ /**
+ * Creates a summary info of the storage in the format "table=user, partitionId=1".
+ */
+ String createStorageInfo() {
+ return IgniteStringFormatter.format("table={}, partitionId={}", tableStorage.getTableName(), partitionId);
+ }
+
+ /**
+ * Prepares the storage for rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when starting the rebalance.
+ */
+ void startRebalance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ try {
+ clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+ } finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * Aborts storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+ */
+ void abortReblance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+
+ try {
+ clearStorageOnRebalance(writeBatch, 0, 0);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+ }
+ }
+
+ /**
+ * Completes storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+ */
+ void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) {
+ if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+
+ try {
+ saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+ }
+ }
+
+ private void clearStorageOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ saveLastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+ writeBatch.delete(meta, lastGroupConfigKey);
+ writeBatch.delete(meta, partitionIdKey(partitionId));
+ writeBatch.deleteRange(cf, partitionStartPrefix(), partitionEndPrefix());
+ }
+
+ private void saveLastAppliedOnRebalance(WriteBatch writeBatch, long lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+
+ persistedIndex = lastAppliedIndex;
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index dfbf279ebf..811a30dcc0 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexId;
+import static org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.io.IOException;
@@ -34,6 +35,8 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -49,6 +52,7 @@ import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -57,9 +61,10 @@ import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.ColumnFamilyType;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbBinaryTupleComparator;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
-import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -127,7 +132,9 @@ public class RocksDbTableStorage implements MvTableStorage {
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
- private final ConcurrentMap<Integer, CompletableFuture<Void>> partitionIdDestroyFutureMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, CompletableFuture<Void>> destroyFutureByPartitionId = new ConcurrentHashMap<>();
+
+ private final Set<Integer> rebalancePartitions = ConcurrentHashMap.newKeySet();
/**
* Constructor.
@@ -252,7 +259,7 @@ public class RocksDbTableStorage implements MvTableStorage {
default:
throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
- + tableCfg.value().name() + ']');
+ + getTableName() + ']');
}
}
@@ -407,9 +414,12 @@ public class RocksDbTableStorage implements MvTableStorage {
return inBusyLock(busyLock, () -> {
checkPartitionId(partitionId);
+ assert !rebalancePartitions.contains(partitionId)
+ : IgniteStringFormatter.format("table={}, partitionId={}", getTableName(), partitionId);
+
CompletableFuture<Void> destroyPartitionFuture = new CompletableFuture<>();
- CompletableFuture<Void> previousDestroyPartitionFuture = partitionIdDestroyFutureMap.putIfAbsent(
+ CompletableFuture<Void> previousDestroyPartitionFuture = destroyFutureByPartitionId.putIfAbsent(
partitionId,
destroyPartitionFuture
);
@@ -442,16 +452,16 @@ public class RocksDbTableStorage implements MvTableStorage {
flushFuture.whenComplete((unused, throwable) -> {
if (throwable == null) {
- partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+ destroyFutureByPartitionId.remove(partitionId).complete(null);
} else {
- partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+ destroyFutureByPartitionId.remove(partitionId).completeExceptionally(throwable);
}
});
} catch (Throwable throwable) {
- partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+ destroyFutureByPartitionId.remove(partitionId).completeExceptionally(throwable);
}
} else {
- partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+ destroyFutureByPartitionId.remove(partitionId).complete(null);
}
return destroyPartitionFuture;
@@ -466,7 +476,7 @@ public class RocksDbTableStorage implements MvTableStorage {
RocksDbMvPartitionStorage partitionStorage = getMvPartitionBusy(partitionId);
if (partitionStorage == null) {
- throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+ throw new StorageException(createMissingMvPartitionErrorMessage(partitionId));
}
return storages.getOrCreateStorage(partitionStorage);
@@ -545,15 +555,15 @@ public class RocksDbTableStorage implements MvTableStorage {
/**
* Checks that a passed partition id is within the proper bounds.
*
- * @param partId Partition id.
+ * @param partitionId Partition id.
*/
- private void checkPartitionId(int partId) {
- if (partId < 0 || partId >= partitions.length()) {
- throw new IllegalArgumentException(S.toString(
- "Unable to access partition with id outside of configured range",
- "table", tableCfg.value().name(), false,
- "partitionId", partId, false,
- "partitions", partitions.length(), false
+ private void checkPartitionId(int partitionId) {
+ if (partitionId < 0 || partitionId >= partitions.length()) {
+ throw new IllegalArgumentException(IgniteStringFormatter.format(
+ "Unable to access partition with id outside of configured range [table={}, partitionId={}, partitions={}]",
+ getTableName(),
+ partitionId,
+ partitions.length()
));
}
}
@@ -618,7 +628,7 @@ public class RocksDbTableStorage implements MvTableStorage {
return sortedIndexCfDescriptor(cfName, indexDescriptor);
default:
- throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.value().name() + ']');
+ throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + getTableName() + ']');
}
}
@@ -635,19 +645,115 @@ public class RocksDbTableStorage implements MvTableStorage {
@Override
public CompletableFuture<Void> startRebalancePartition(int partitionId) {
- // TODO: IGNITE-18027 Implement
- throw new UnsupportedOperationException();
+ return inBusyLock(busyLock, () -> {
+ RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+ if (mvPartitionStorage == null) {
+ throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+ }
+
+ assert !destroyFutureByPartitionId.containsKey(partitionId) : mvPartitionStorage.createStorageInfo();
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ mvPartitionStorage.startRebalance(writeBatch);
+
+ getHashIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+ getSortedIndexStorages(partitionId).forEach(index -> index.startRebalance(writeBatch));
+
+ db.write(writeOptions, writeBatch);
+
+ boolean added = rebalancePartitions.add(partitionId);
+
+ assert added : mvPartitionStorage.createStorageInfo();
+
+ return completedFuture(null);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException(
+ "Error when trying to start rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+ e
+ );
+ }
+ });
}
@Override
public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
- // TODO: IGNITE-18027 Implement
- throw new UnsupportedOperationException();
+ return inBusyLock(busyLock, () -> {
+ RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+ if (mvPartitionStorage == null) {
+ throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+ }
+
+ boolean removed = rebalancePartitions.remove(partitionId);
+
+ if (!removed) {
+ return completedFuture(null);
+ }
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ mvPartitionStorage.abortReblance(writeBatch);
+
+ getHashIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+ getSortedIndexStorages(partitionId).forEach(index -> index.abortReblance(writeBatch));
+
+ db.write(writeOptions, writeBatch);
+
+ return completedFuture(null);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException(
+ "Error when trying to abort rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+ e
+ );
+ }
+ });
}
@Override
public CompletableFuture<Void> finishRebalancePartition(int partitionId, long lastAppliedIndex, long lastAppliedTerm) {
- // TODO: IGNITE-18027 Implement
- throw new UnsupportedOperationException();
+ return inBusyLock(busyLock, () -> {
+ RocksDbMvPartitionStorage mvPartitionStorage = getMvPartitionBusy(partitionId);
+
+ if (mvPartitionStorage == null) {
+ throw new StorageRebalanceException(createMissingMvPartitionErrorMessage(partitionId));
+ }
+
+ boolean removed = rebalancePartitions.remove(partitionId);
+
+ if (!removed) {
+ throw new StorageRebalanceException("Rebalance for partition did not start: " + mvPartitionStorage.createStorageInfo());
+ }
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ mvPartitionStorage.finishRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+ getHashIndexStorages(partitionId).forEach(RocksDbHashIndexStorage::finishRebalance);
+ getSortedIndexStorages(partitionId).forEach(RocksDbSortedIndexStorage::finishRebalance);
+
+ db.write(writeOptions, writeBatch);
+
+ return completedFuture(null);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException(
+ "Error when trying to finish rebalancing storage: " + mvPartitionStorage.createStorageInfo(),
+ e
+ );
+ }
+ });
+ }
+
+ /**
+ * Returns table name.
+ */
+ String getTableName() {
+ return tableCfg.name().value();
+ }
+
+ private List<RocksDbHashIndexStorage> getHashIndexStorages(int partitionId) {
+ return hashIndices.values().stream().map(indexes -> indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
+ }
+
+ private List<RocksDbSortedIndexStorage> getSortedIndexStorages(int partitionId) {
+ return sortedIndices.values().stream().map(indexes -> indexes.get(partitionId)).filter(Objects::nonNull).collect(toList());
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index a6dfd44671..9c7f425119 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@@ -91,4 +92,13 @@ class SortedIndex implements ManuallyCloseable {
public void close() {
indexCf.handle().close();
}
+
+ /**
+ * Returns sorted index storage for partition.
+ *
+ * @param partitionId Partition ID.
+ */
+ @Nullable RocksDbSortedIndexStorage get(int partitionId) {
+ return storages.get(partitionId);
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index cb43a95e52..c9c80e02af 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -18,28 +18,33 @@
package org.apache.ignite.internal.storage.rocksdb.index;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementArray;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
-import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.HashUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
@@ -79,11 +84,11 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
*/
private final byte[] constantPrefix;
- /** Busy lock to stop synchronously. */
+ /** Busy lock. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- /** Prevents double stopping the component. */
- private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** Current state of the storage. */
+ private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
/**
* Creates a new Hash Index storage.
@@ -119,6 +124,8 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
byte[] rangeStart = rocksPrefix(key);
byte[] rangeEnd = incrementArray(rangeStart);
@@ -131,12 +138,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
it.seek(rangeStart);
- return new BusyRocksIteratorAdapter<RowId>(busyLock, it) {
- @Override
- protected void handleBusyFail() {
- throw new StorageClosedException();
- }
-
+ return new RocksIteratorAdapter<RowId>(it) {
@Override
protected RowId decodeEntry(byte[] key, byte[] value) {
// RowId UUID is located at the last 16 bytes of the key
@@ -146,6 +148,24 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
return new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
}
+ @Override
+ public boolean hasNext() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbHashIndexStorage.this::createStorageInfo);
+
+ return super.hasNext();
+ });
+ }
+
+ @Override
+ public RowId next() {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbHashIndexStorage.this::createStorageInfo);
+
+ return super.next();
+ });
+ }
+
@Override
public void close() {
super.close();
@@ -174,6 +194,8 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
@Override
public void remove(IndexRow row) {
busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
try {
WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
@@ -189,6 +211,8 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
@Override
public void destroy() {
busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
byte[] rangeEnd = incrementArray(constantPrefix);
assert rangeEnd != null;
@@ -231,7 +255,11 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
* Closes the hash index storage.
*/
public void close() {
- if (!stopGuard.compareAndSet(false, true)) {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+ StorageState state = this.state.get();
+
+ assert state == StorageState.CLOSED : state;
+
return;
}
@@ -253,7 +281,7 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
private <V> V busy(Supplier<V> supplier) {
if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
}
try {
@@ -262,4 +290,58 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
busyLock.leaveBusy();
}
}
+
+ private String createStorageInfo() {
+ return IgniteStringFormatter.format("indexId={}, partitionId={}", descriptor.id(), partitionStorage.partitionId());
+ }
+
+ /**
+ * Prepares the storage for rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when starting the rebalance.
+ */
+ public void startRebalance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ try {
+ destroyData(writeBatch);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+ } finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * Aborts storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+ */
+ public void abortReblance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+
+ try {
+ destroyData(writeBatch);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+ }
+ }
+
+ /**
+ * Completes storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+ */
+ public void finishRebalance() {
+ if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index a66568f2cd..bedad0045e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -18,13 +18,16 @@
package org.apache.ignite.internal.storage.rocksdb.index;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementArray;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
+import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -33,16 +36,18 @@ import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage;
+import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
@@ -74,11 +79,11 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
private final RocksDbMvPartitionStorage partitionStorage;
- /** Busy lock to stop synchronously. */
+ /** Busy lock. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- /** Prevents double stopping the component. */
- private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** Current state of the storage. */
+ private final AtomicReference<StorageState> state = new AtomicReference<>(StorageState.RUNNABLE);
/**
* Creates a storage.
@@ -105,6 +110,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) throws StorageException {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
@@ -129,6 +136,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public void remove(IndexRow row) {
busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
try {
WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
@@ -144,6 +153,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
@@ -242,6 +253,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public @Nullable T peek() {
return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbSortedIndexStorage.this::createStorageInfo);
+
if (hasNext != null) {
if (hasNext) {
return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
@@ -263,6 +276,8 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
}
private void advanceIfNeeded() throws StorageException {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbSortedIndexStorage.this::createStorageInfo);
+
if (hasNext != null) {
return;
}
@@ -363,7 +378,11 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
* Closes the sorted index storage.
*/
public void close() {
- if (!stopGuard.compareAndSet(false, true)) {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
+ StorageState state = this.state.get();
+
+ assert state == StorageState.CLOSED : state;
+
return;
}
@@ -387,7 +406,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
private <V> V busy(Supplier<V> supplier) {
if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
+ throwExceptionDependingOnStorageState(state.get(), createStorageInfo());
}
try {
@@ -396,4 +415,58 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
busyLock.leaveBusy();
}
}
+
+ private String createStorageInfo() {
+ return IgniteStringFormatter.format("indexId={}, partitionId={}", descriptor.id(), partitionStorage.partitionId());
+ }
+
+ /**
+ * Prepares the storage for rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when starting the rebalance.
+ */
+ public void startRebalance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.REBALANCE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop soon.
+ busyLock.block();
+
+ try {
+ destroyData(writeBatch);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to start rebalancing storage: " + createStorageInfo(), e);
+ } finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * Aborts storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when aborting the rebalance.
+ */
+ public void abortReblance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+
+ try {
+ destroyData(writeBatch);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to abort rebalancing storage: " + createStorageInfo(), e);
+ }
+ }
+
+ /**
+ * Completes storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when finishing the rebalance.
+ */
+ public void finishRebalance() {
+ if (!state.compareAndSet(StorageState.REBALANCE, StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(), createStorageInfo());
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 3a56c297de..952a104876 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -126,27 +126,6 @@ public class RocksDbMvTableStorageTest extends AbstractMvTableStorageTest {
assertThat(tableStorage.isVolatile(), is(false));
}
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
- @Override
- public void testSuccessRebalance() throws Exception {
- super.testSuccessRebalance();
- }
-
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
- @Override
- public void testFailRebalance() throws Exception {
- super.testFailRebalance();
- }
-
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
- @Override
- public void testStartRebalanceForClosedPartition() {
- super.testStartRebalanceForClosedPartition();
- }
-
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18523")
@Override