You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/01/18 10:53:08 UTC
[ignite-3] branch main updated: IGNITE-18566 Fix busyLock usage in RocksDbTableStorage and related (#1543)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2995285758 IGNITE-18566 Fix busyLock usage in RocksDbTableStorage and related (#1543)
2995285758 is described below
commit 29952857580e4eb4d24a902af03584bebf7083c0
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Jan 18 13:53:02 2023 +0300
IGNITE-18566 Fix busyLock usage in RocksDbTableStorage and related (#1543)
---
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 489 ++++++++++-----------
.../storage/rocksdb/RocksDbTableStorage.java | 287 ++++++------
.../rocksdb/index/RocksDbHashIndexStorage.java | 127 +++---
.../rocksdb/index/RocksDbSortedIndexStorage.java | 109 ++---
4 files changed, 493 insertions(+), 519 deletions(-)
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 3523066ce6..2dbc8e4991 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -251,41 +251,36 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
if (threadLocalWriteBatch.get() != null) {
return closure.execute();
} else {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
-
- try (var writeBatch = new WriteBatchWithIndex()) {
- threadLocalWriteBatch.set(writeBatch);
+ return busy(() -> {
+ try (var writeBatch = new WriteBatchWithIndex()) {
+ threadLocalWriteBatch.set(writeBatch);
- pendingAppliedIndex = lastAppliedIndex;
- pendingAppliedTerm = lastAppliedTerm;
- pendingGroupConfig = lastGroupConfig;
+ pendingAppliedIndex = lastAppliedIndex;
+ pendingAppliedTerm = lastAppliedTerm;
+ pendingGroupConfig = lastGroupConfig;
- V res = closure.execute();
+ V res = closure.execute();
- try {
- if (writeBatch.count() > 0) {
- db.write(writeOpts, writeBatch);
+ try {
+ if (writeBatch.count() > 0) {
+ db.write(writeOpts, writeBatch);
+ }
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to apply a write batch to RocksDB instance.", e);
}
- } catch (RocksDBException e) {
- throw new StorageException("Unable to apply a write batch to RocksDB instance.", e);
- }
- lastAppliedIndex = pendingAppliedIndex;
- lastAppliedTerm = pendingAppliedTerm;
- lastGroupConfig = pendingGroupConfig;
+ lastAppliedIndex = pendingAppliedIndex;
+ lastAppliedTerm = pendingAppliedTerm;
+ lastGroupConfig = pendingGroupConfig;
- return res;
- } finally {
- threadLocalWriteBatch.set(null);
-
- busyLock.leaveBusy();
- }
+ return res;
+ } finally {
+ threadLocalWriteBatch.set(null);
+ }
+ });
}
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> flush() {
return busy(() -> tableStorage.awaitFlush(true));
@@ -318,23 +313,21 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
- busy(() -> lastAppliedBusy(lastAppliedIndex, lastAppliedTerm));
- }
-
- private Void lastAppliedBusy(long lastAppliedIndex, long lastAppliedTerm) throws StorageException {
- WriteBatchWithIndex writeBatch = requireWriteBatch();
+ busy(() -> {
+ WriteBatchWithIndex writeBatch = requireWriteBatch();
- try {
- writeBatch.put(meta, lastAppliedIndexKey, ByteUtils.longToBytes(lastAppliedIndex));
- writeBatch.put(meta, lastAppliedTermKey, ByteUtils.longToBytes(lastAppliedTerm));
+ try {
+ writeBatch.put(meta, lastAppliedIndexKey, ByteUtils.longToBytes(lastAppliedIndex));
+ writeBatch.put(meta, lastAppliedTermKey, ByteUtils.longToBytes(lastAppliedTerm));
- pendingAppliedIndex = lastAppliedIndex;
- pendingAppliedTerm = lastAppliedTerm;
+ pendingAppliedIndex = lastAppliedIndex;
+ pendingAppliedTerm = lastAppliedTerm;
- return null;
- } catch (RocksDBException e) {
- throw new StorageException(e);
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+ });
}
@Override
@@ -350,21 +343,19 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
- busy(() -> committedGroupConfigurationBusy(config));
- }
+ busy(() -> {
+ WriteBatchWithIndex writeBatch = requireWriteBatch();
- private Void committedGroupConfigurationBusy(RaftGroupConfiguration config) {
- WriteBatchWithIndex writeBatch = requireWriteBatch();
-
- try {
- writeBatch.put(meta, lastGroupConfigKey, ByteUtils.toBytes(config));
+ try {
+ writeBatch.put(meta, lastGroupConfigKey, ByteUtils.toBytes(config));
- pendingGroupConfig = config;
+ pendingGroupConfig = config;
- return null;
- } catch (RocksDBException e) {
- throw new StorageException(e);
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException(e);
+ }
+ });
}
/**
@@ -439,54 +430,51 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
throws TxIdMismatchException, StorageException {
- return busy(() -> addWriteBusy(rowId, row, txId, commitTableId, commitPartitionId));
- }
-
- private @Nullable BinaryRow addWriteBusy(RowId rowId, @Nullable BinaryRow row, UUID txId, UUID commitTableId, int commitPartitionId)
- throws TxIdMismatchException, StorageException {
- @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
+ return busy(() -> {
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- BinaryRow res = null;
+ BinaryRow res = null;
- try {
- // Check concurrent transaction data.
- byte[] keyBufArray = keyBuf.array();
+ try {
+ // Check concurrent transaction data.
+ byte[] keyBufArray = keyBuf.array();
- byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
+ byte[] keyBytes = copyOf(keyBufArray, ROW_PREFIX_SIZE);
- byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
+ byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
- // Previous value must belong to the same transaction.
- if (previousValue != null) {
- validateTxId(previousValue, txId);
+ // Previous value must belong to the same transaction.
+ if (previousValue != null) {
+ validateTxId(previousValue, txId);
- res = wrapValueIntoBinaryRow(previousValue, true);
- }
+ res = wrapValueIntoBinaryRow(previousValue, true);
+ }
- if (row == null) {
- // Write empty value as a tombstone.
- if (previousValue != null) {
- // Reuse old array with transaction id already written to it.
- writeBatch.put(cf, keyBytes, copyOf(previousValue, VALUE_HEADER_SIZE));
- } else {
- byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
+ if (row == null) {
+ // Write empty value as a tombstone.
+ if (previousValue != null) {
+ // Reuse old array with transaction id already written to it.
+ writeBatch.put(cf, keyBytes, copyOf(previousValue, VALUE_HEADER_SIZE));
+ } else {
+ byte[] valueHeaderBytes = new byte[VALUE_HEADER_SIZE];
- putUuidToBytes(txId, valueHeaderBytes, TX_ID_OFFSET);
- putUuidToBytes(commitTableId, valueHeaderBytes, TABLE_ID_OFFSET);
- putShort(valueHeaderBytes, PARTITION_ID_OFFSET, (short) commitPartitionId);
+ putUuidToBytes(txId, valueHeaderBytes, TX_ID_OFFSET);
+ putUuidToBytes(commitTableId, valueHeaderBytes, TABLE_ID_OFFSET);
+ putShort(valueHeaderBytes, PARTITION_ID_OFFSET, (short) commitPartitionId);
- writeBatch.put(cf, keyBytes, valueHeaderBytes);
+ writeBatch.put(cf, keyBytes, valueHeaderBytes);
+ }
+ } else {
+ writeUnversioned(keyBufArray, row, txId, commitTableId, commitPartitionId);
}
- } else {
- writeUnversioned(keyBufArray, row, txId, commitTableId, commitPartitionId);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to update a row in storage", e);
}
- } catch (RocksDBException e) {
- throw new StorageException("Failed to update a row in storage", e);
- }
- return res;
+ return res;
+ });
}
/**
@@ -523,120 +511,112 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException {
- return busy(() -> abortWriteBusy(rowId));
- }
-
- private @Nullable BinaryRow abortWriteBusy(RowId rowId) throws StorageException {
- WriteBatchWithIndex writeBatch = requireWriteBatch();
+ return busy(() -> {
+ WriteBatchWithIndex writeBatch = requireWriteBatch();
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- try {
- byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+ try {
+ byte[] keyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
- byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
+ byte[] previousValue = writeBatch.getFromBatchAndDB(db, cf, readOpts, keyBytes);
- if (previousValue == null) {
- //the chain doesn't contain an uncommitted write intent
- return null;
- }
+ if (previousValue == null) {
+ //the chain doesn't contain an uncommitted write intent
+ return null;
+ }
- // Perform unconditional remove for the key without associated timestamp.
- writeBatch.delete(cf, keyBytes);
+ // Perform unconditional remove for the key without associated timestamp.
+ writeBatch.delete(cf, keyBytes);
- return wrapValueIntoBinaryRow(previousValue, true);
- } catch (RocksDBException e) {
- throw new StorageException("Failed to roll back insert/update", e);
- }
+ return wrapValueIntoBinaryRow(previousValue, true);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to roll back insert/update", e);
+ }
+ });
}
@Override
public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException {
- busy(() -> commitWriteBusy(rowId, timestamp));
- }
-
- private Void commitWriteBusy(RowId rowId, HybridTimestamp timestamp) throws StorageException {
- WriteBatchWithIndex writeBatch = requireWriteBatch();
+ busy(() -> {
+ WriteBatchWithIndex writeBatch = requireWriteBatch();
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- try {
- // Read a value associated with pending write.
- byte[] uncommittedKeyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+ try {
+ // Read a value associated with pending write.
+ byte[] uncommittedKeyBytes = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
- byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts, uncommittedKeyBytes);
+ byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, readOpts, uncommittedKeyBytes);
- if (valueBytes == null) {
- //the chain doesn't contain an uncommitted write intent
- return null;
- }
+ if (valueBytes == null) {
+ //the chain doesn't contain an uncommitted write intent
+ return null;
+ }
- // Delete pending write.
- writeBatch.delete(cf, uncommittedKeyBytes);
+ // Delete pending write.
+ writeBatch.delete(cf, uncommittedKeyBytes);
- // Add timestamp to the key, and put the value back into the storage.
- putTimestamp(keyBuf, timestamp);
+ // Add timestamp to the key, and put the value back into the storage.
+ putTimestamp(keyBuf, timestamp);
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
+ writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), copyOfRange(valueBytes, VALUE_HEADER_SIZE, valueBytes.length));
- return null;
- } catch (RocksDBException e) {
- throw new StorageException("Failed to commit row into storage", e);
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to commit row into storage", e);
+ }
+ });
}
@Override
public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
- busy(() -> addWriteCommittedBusy(rowId, row, commitTimestamp));
- }
+ busy(() -> {
+ @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
- private Void addWriteCommittedBusy(RowId rowId, @Nullable BinaryRow row, HybridTimestamp commitTimestamp) throws StorageException {
- @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = requireWriteBatch();
-
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- putTimestamp(keyBuf, commitTimestamp);
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ putTimestamp(keyBuf, commitTimestamp);
- //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
- byte[] rowBytes = rowBytes(row);
+ //TODO IGNITE-16913 Add proper way to write row bytes into array without allocations.
+ byte[] rowBytes = rowBytes(row);
- try {
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes);
+ try {
+ writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), rowBytes);
- return null;
- } catch (RocksDBException e) {
- throw new StorageException("Failed to update a row in storage", e);
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to update a row in storage", e);
+ }
+ });
}
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
- return busy(() -> readBusy(rowId, timestamp));
- }
-
- private ReadResult readBusy(RowId rowId, HybridTimestamp timestamp) throws StorageException {
- if (rowId.partitionId() != partitionId) {
- throw new IllegalArgumentException(
- String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
- }
-
- // We can read data outside of consistency closure. Batch is not required.
- WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
+ return busy(() -> {
+ if (rowId.partitionId() != partitionId) {
+ throw new IllegalArgumentException(
+ String.format("RowId partition [%d] is not equal to storage partition [%d].", rowId.partitionId(), partitionId));
+ }
- try (
- // Set next partition as an upper bound.
- RocksIterator baseIterator = db.newIterator(cf, upperBoundReadOpts);
- // "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
- // It's not documented, but this is exactly how it should be used.
- RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
- ? writeBatch.newIteratorWithBase(cf, baseIterator)
- : baseIterator
- ) {
- if (lookingForLatestVersions(timestamp)) {
- return readLatestVersion(rowId, seekIterator);
- } else {
- return readByTimestamp(seekIterator, rowId, timestamp);
+ // We can read data outside of consistency closure. Batch is not required.
+ WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
+
+ try (
+ // Set next partition as an upper bound.
+ RocksIterator baseIterator = db.newIterator(cf, upperBoundReadOpts);
+ // "count()" check is mandatory. Write batch iterator without any updates just crashes everything.
+ // It's not documented, but this is exactly how it should be used.
+ RocksIterator seekIterator = writeBatch != null && writeBatch.count() > 0
+ ? writeBatch.newIteratorWithBase(cf, baseIterator)
+ : baseIterator
+ ) {
+ if (lookingForLatestVersions(timestamp)) {
+ return readLatestVersion(rowId, seekIterator);
+ } else {
+ return readByTimestamp(seekIterator, rowId, timestamp);
+ }
}
- }
+ });
}
private static boolean lookingForLatestVersions(HybridTimestamp timestamp) {
@@ -829,56 +809,53 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
- return busy(() -> scanVersionsBusy(rowId));
- }
-
- private Cursor<ReadResult> scanVersionsBusy(RowId rowId) throws StorageException {
- ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
+ return busy(() -> {
+ ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
+ byte[] lowerBound = copyOf(keyBuf.array(), ROW_PREFIX_SIZE);
- incrementRowId(keyBuf);
+ incrementRowId(keyBuf);
- Slice upperBound = new Slice(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
+ Slice upperBound = new Slice(copyOf(keyBuf.array(), ROW_PREFIX_SIZE));
- var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
- RocksIterator it = db.newIterator(cf, options);
+ RocksIterator it = db.newIterator(cf, options);
- WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
+ WriteBatchWithIndex writeBatch = threadLocalWriteBatch.get();
- if (writeBatch != null && writeBatch.count() > 0) {
- it = writeBatch.newIteratorWithBase(cf, it);
- }
+ if (writeBatch != null && writeBatch.count() > 0) {
+ it = writeBatch.newIteratorWithBase(cf, it);
+ }
- it.seek(lowerBound);
+ it.seek(lowerBound);
- return new BusyRocksIteratorAdapter<>(busyLock, it) {
- @Override
- protected ReadResult decodeEntry(byte[] key, byte[] value) {
- int keyLength = key.length;
+ return new BusyRocksIteratorAdapter<ReadResult>(busyLock, it) {
+ @Override
+ protected ReadResult decodeEntry(byte[] key, byte[] value) {
+ int keyLength = key.length;
- boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+ boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
- return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
- }
+ return readResultFromKeyAndValue(isWriteIntent, ByteBuffer.wrap(key).order(KEY_BYTE_ORDER), value);
+ }
- @Override
- protected void handleBusyFail() {
- throw new StorageClosedException();
- }
+ @Override
+ protected void handleBusyFail() {
+ throw new StorageClosedException();
+ }
- @Override
- public void close() {
- super.close();
+ @Override
+ public void close() {
+ super.close();
- RocksUtils.closeAll(options, upperBound);
- }
- };
+ RocksUtils.closeAll(options, upperBound);
+ }
+ };
+ });
}
// TODO: IGNITE-16914 Play with prefix settings and benchmark results.
- /** {@inheritDoc} */
@Override
public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException {
Objects.requireNonNull(timestamp, "timestamp is null");
@@ -905,29 +882,27 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable RowId closestRowId(RowId lowerBound) throws StorageException {
- return busy(() -> closestRowIdBusy(lowerBound));
- }
-
- private @Nullable RowId closestRowIdBusy(RowId lowerBound) throws StorageException {
- ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
+ return busy(() -> {
+ ByteBuffer keyBuf = prepareHeapKeyBuf(lowerBound).position(0).limit(ROW_PREFIX_SIZE);
- try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
- it.seek(keyBuf);
+ try (RocksIterator it = db.newIterator(cf, scanReadOptions)) {
+ it.seek(keyBuf);
- if (!it.isValid()) {
- RocksUtils.checkIterator(it);
+ if (!it.isValid()) {
+ RocksUtils.checkIterator(it);
- return null;
- }
+ return null;
+ }
- ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(ROW_PREFIX_SIZE);
+ ByteBuffer readKeyBuf = MV_KEY_BUFFER.get().position(0).limit(ROW_PREFIX_SIZE);
- it.key(readKeyBuf);
+ it.key(readKeyBuf);
- return getRowId(readKeyBuf);
- } finally {
- keyBuf.limit(MAX_KEY_SIZE);
- }
+ return getRowId(readKeyBuf);
+ } finally {
+ keyBuf.limit(MAX_KEY_SIZE);
+ }
+ });
}
private static void incrementRowId(ByteBuffer buf) {
@@ -964,26 +939,24 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public long rowsCount() {
- return busy(this::rowsCountBusy);
- }
+ return busy(() -> {
+ try (
+ var upperBound = new Slice(partitionEndPrefix());
+ var options = new ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, options)
+ ) {
+ it.seek(partitionStartPrefix());
- private long rowsCountBusy() {
- try (
- var upperBound = new Slice(partitionEndPrefix());
- var options = new ReadOptions().setIterateUpperBound(upperBound);
- RocksIterator it = db.newIterator(cf, options)
- ) {
- it.seek(partitionStartPrefix());
+ long size = 0;
- long size = 0;
+ while (it.isValid()) {
+ ++size;
+ it.next();
+ }
- while (it.isValid()) {
- ++size;
- it.next();
+ return size;
}
-
- return size;
- }
+ });
}
/**
@@ -1208,44 +1181,40 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
@Override
public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
- return busy(() -> committedBusy(timestamp));
- }
-
- private @Nullable BinaryRow committedBusy(HybridTimestamp timestamp) {
- Objects.requireNonNull(timestamp, "timestamp is null");
+ return busy(() -> {
+ Objects.requireNonNull(timestamp, "timestamp is null");
- if (currentRowId == null) {
- throw new IllegalStateException("currentRowId is null");
- }
+ if (currentRowId == null) {
+ throw new IllegalStateException("currentRowId is null");
+ }
- setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
+ setKeyBuffer(seekKeyBuf, currentRowId, timestamp);
- it.seek(seekKeyBuf.array());
+ it.seek(seekKeyBuf.array());
- ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
+ ReadResult readResult = handleReadByTimestampIterator(it, currentRowId, timestamp, seekKeyBuf);
- if (readResult.isEmpty()) {
- return null;
- }
+ if (readResult.isEmpty()) {
+ return null;
+ }
- return readResult.binaryRow();
+ return readResult.binaryRow();
+ });
}
@Override
public final ReadResult next() {
- return busy(this::nextBusy);
- }
-
- private ReadResult nextBusy() {
- if (!hasNextBusy()) {
- throw new NoSuchElementException();
- }
+ return busy(() -> {
+ if (!hasNextBusy()) {
+ throw new NoSuchElementException();
+ }
- ReadResult res = next;
+ ReadResult res = next;
- next = null;
+ next = null;
- return res;
+ return res;
+ });
}
@Override
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 66ab805e79..dfbf279ebf 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexId;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.io.IOException;
import java.nio.file.Files;
@@ -190,84 +191,86 @@ public class RocksDbTableStorage implements MvTableStorage {
@Override
public void start() throws StorageException {
- flusher = new RocksDbFlusher(
- busyLock,
- engine.scheduledPool(),
- engine().threadPool(),
- engine.configuration().flushDelayMillis()::value,
- this::refreshPersistedIndexes
- );
+ inBusyLock(busyLock, () -> {
+ flusher = new RocksDbFlusher(
+ busyLock,
+ engine.scheduledPool(),
+ engine().threadPool(),
+ engine.configuration().flushDelayMillis()::value,
+ this::refreshPersistedIndexes
+ );
- try {
- Files.createDirectories(tablePath);
- } catch (IOException e) {
- throw new StorageException("Failed to create a directory for the table storage", e);
- }
+ try {
+ Files.createDirectories(tablePath);
+ } catch (IOException e) {
+ throw new StorageException("Failed to create a directory for the table storage", e);
+ }
- List<ColumnFamilyDescriptor> cfDescriptors = getExistingCfDescriptors();
+ List<ColumnFamilyDescriptor> cfDescriptors = getExistingCfDescriptors();
- List<ColumnFamilyHandle> cfHandles = new ArrayList<>(cfDescriptors.size());
+ List<ColumnFamilyHandle> cfHandles = new ArrayList<>(cfDescriptors.size());
- DBOptions dbOptions = new DBOptions()
- .setCreateIfMissing(true)
- .setCreateMissingColumnFamilies(true)
- // Atomic flush must be enabled to guarantee consistency between different column families when WAL is disabled.
- .setAtomicFlush(true)
- .setListeners(List.of(flusher.listener()))
- .setWriteBufferManager(dataRegion.writeBufferManager());
+ DBOptions dbOptions = new DBOptions()
+ .setCreateIfMissing(true)
+ .setCreateMissingColumnFamilies(true)
+ // Atomic flush must be enabled to guarantee consistency between different column families when WAL is disabled.
+ .setAtomicFlush(true)
+ .setListeners(List.of(flusher.listener()))
+ .setWriteBufferManager(dataRegion.writeBufferManager());
- try {
- db = RocksDB.open(dbOptions, tablePath.toAbsolutePath().toString(), cfDescriptors, cfHandles);
+ try {
+ db = RocksDB.open(dbOptions, tablePath.toAbsolutePath().toString(), cfDescriptors, cfHandles);
- // read all existing Column Families from the db and parse them according to type: meta, partition data or index.
- for (ColumnFamilyHandle cfHandle : cfHandles) {
- ColumnFamily cf = ColumnFamily.wrap(db, cfHandle);
+ // read all existing Column Families from the db and parse them according to type: meta, partition data or index.
+ for (ColumnFamilyHandle cfHandle : cfHandles) {
+ ColumnFamily cf = ColumnFamily.wrap(db, cfHandle);
- switch (ColumnFamilyType.fromCfName(cf.name())) {
- case META:
- meta = new RocksDbMetaStorage(cf);
+ switch (ColumnFamilyType.fromCfName(cf.name())) {
+ case META:
+ meta = new RocksDbMetaStorage(cf);
- break;
+ break;
- case PARTITION:
- partitionCf = cf;
+ case PARTITION:
+ partitionCf = cf;
- break;
+ break;
- case HASH_INDEX:
- hashIndexCf = cf;
+ case HASH_INDEX:
+ hashIndexCf = cf;
- break;
+ break;
- case SORTED_INDEX:
- UUID indexId = sortedIndexId(cf.name());
+ case SORTED_INDEX:
+ UUID indexId = sortedIndexId(cf.name());
- var indexDescriptor = new SortedIndexDescriptor(indexId, tablesCfg.value());
+ var indexDescriptor = new SortedIndexDescriptor(indexId, tablesCfg.value());
- sortedIndices.put(indexId, new SortedIndex(cf, indexDescriptor));
+ sortedIndices.put(indexId, new SortedIndex(cf, indexDescriptor));
- break;
+ break;
- default:
- throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
- + tableCfg.value().name() + ']');
+ default:
+ throw new StorageException("Unidentified column family [name=" + cf.name() + ", table="
+ + tableCfg.value().name() + ']');
+ }
}
- }
- assert meta != null;
- assert partitionCf != null;
- assert hashIndexCf != null;
+ assert meta != null;
+ assert partitionCf != null;
+ assert hashIndexCf != null;
- flusher.init(db, cfHandles);
- } catch (RocksDBException e) {
- throw new StorageException("Failed to initialize RocksDB instance", e);
- }
+ flusher.init(db, cfHandles);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to initialize RocksDB instance", e);
+ }
- partitions = new AtomicReferenceArray<>(tableCfg.value().partitions());
+ partitions = new AtomicReferenceArray<>(tableCfg.value().partitions());
- for (int partId : meta.getPartitionIds()) {
- partitions.set(partId, new RocksDbMvPartitionStorage(this, partId));
- }
+ for (int partId : meta.getPartitionIds()) {
+ partitions.set(partId, new RocksDbMvPartitionStorage(this, partId));
+ }
+ });
}
/**
@@ -277,7 +280,7 @@ public class RocksDbTableStorage implements MvTableStorage {
* @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggerred in the near future.
*/
public CompletableFuture<Void> awaitFlush(boolean schedule) {
- return flusher.awaitFlush(schedule);
+ return inBusyLock(busyLock, () -> flusher.awaitFlush(schedule));
}
private void refreshPersistedIndexes() {
@@ -289,7 +292,7 @@ public class RocksDbTableStorage implements MvTableStorage {
TableView tableCfgView = configuration().value();
for (int partitionId = 0; partitionId < tableCfgView.partitions(); partitionId++) {
- RocksDbMvPartitionStorage partition = getMvPartition(partitionId);
+ RocksDbMvPartitionStorage partition = getMvPartitionBusy(partitionId);
if (partition != null) {
try {
@@ -371,23 +374,29 @@ public class RocksDbTableStorage implements MvTableStorage {
@Override
public RocksDbMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
- RocksDbMvPartitionStorage partition = getMvPartition(partitionId);
+ return inBusyLock(busyLock, () -> {
+ RocksDbMvPartitionStorage partition = getMvPartitionBusy(partitionId);
- if (partition != null) {
- return partition;
- }
+ if (partition != null) {
+ return partition;
+ }
- partition = new RocksDbMvPartitionStorage(this, partitionId);
+ partition = new RocksDbMvPartitionStorage(this, partitionId);
- partitions.set(partitionId, partition);
+ partitions.set(partitionId, partition);
- meta.putPartitionId(partitionId);
+ meta.putPartitionId(partitionId);
- return partition;
+ return partition;
+ });
}
@Override
public @Nullable RocksDbMvPartitionStorage getMvPartition(int partitionId) {
+ return inBusyLock(busyLock, () -> getMvPartitionBusy(partitionId));
+ }
+
+ private @Nullable RocksDbMvPartitionStorage getMvPartitionBusy(int partitionId) {
checkPartitionId(partitionId);
return partitions.get(partitionId);
@@ -395,69 +404,73 @@ public class RocksDbTableStorage implements MvTableStorage {
@Override
public CompletableFuture<Void> destroyPartition(int partitionId) {
- checkPartitionId(partitionId);
+ return inBusyLock(busyLock, () -> {
+ checkPartitionId(partitionId);
- CompletableFuture<Void> destroyPartitionFuture = new CompletableFuture<>();
+ CompletableFuture<Void> destroyPartitionFuture = new CompletableFuture<>();
- CompletableFuture<Void> previousDestroyPartitionFuture = partitionIdDestroyFutureMap.putIfAbsent(
- partitionId,
- destroyPartitionFuture
- );
+ CompletableFuture<Void> previousDestroyPartitionFuture = partitionIdDestroyFutureMap.putIfAbsent(
+ partitionId,
+ destroyPartitionFuture
+ );
- if (previousDestroyPartitionFuture != null) {
- return previousDestroyPartitionFuture;
- }
+ if (previousDestroyPartitionFuture != null) {
+ return previousDestroyPartitionFuture;
+ }
- RocksDbMvPartitionStorage mvPartition = partitions.getAndSet(partitionId, null);
+ RocksDbMvPartitionStorage mvPartition = partitions.getAndSet(partitionId, null);
- if (mvPartition != null) {
- try (WriteBatch writeBatch = new WriteBatch()) {
- mvPartition.close();
+ if (mvPartition != null) {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ mvPartition.close();
- // Operation to delete partition data should be fast, since we will write only the range of keys for deletion, and the
- // RocksDB itself will then destroy the data on flash.
- mvPartition.destroyData(writeBatch);
+ // Operation to delete partition data should be fast, since we will write only the range of keys for deletion, and the
+ // RocksDB itself will then destroy the data on flash.
+ mvPartition.destroyData(writeBatch);
- for (HashIndex hashIndex : hashIndices.values()) {
- hashIndex.destroy(partitionId, writeBatch);
- }
+ for (HashIndex hashIndex : hashIndices.values()) {
+ hashIndex.destroy(partitionId, writeBatch);
+ }
- for (SortedIndex sortedIndex : sortedIndices.values()) {
- sortedIndex.destroy(partitionId, writeBatch);
- }
+ for (SortedIndex sortedIndex : sortedIndices.values()) {
+ sortedIndex.destroy(partitionId, writeBatch);
+ }
- db.write(writeOptions, writeBatch);
+ db.write(writeOptions, writeBatch);
- CompletableFuture<?> flushFuture = awaitFlush(true);
+ CompletableFuture<?> flushFuture = awaitFlush(true);
- flushFuture.whenComplete((unused, throwable) -> {
- if (throwable == null) {
- partitionIdDestroyFutureMap.remove(partitionId).complete(null);
- } else {
- partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
- }
- });
- } catch (Throwable throwable) {
- partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+ flushFuture.whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ partitionIdDestroyFutureMap.remove(partitionId).complete(null);
+ } else {
+ partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+ }
+ });
+ } catch (Throwable throwable) {
+ partitionIdDestroyFutureMap.remove(partitionId).completeExceptionally(throwable);
+ }
+ } else {
+ partitionIdDestroyFutureMap.remove(partitionId).complete(null);
}
- } else {
- partitionIdDestroyFutureMap.remove(partitionId).complete(null);
- }
- return destroyPartitionFuture;
+ return destroyPartitionFuture;
+ });
}
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
- SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
+ return inBusyLock(busyLock, () -> {
+ SortedIndex storages = sortedIndices.computeIfAbsent(indexId, this::createSortedIndex);
- RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
+ RocksDbMvPartitionStorage partitionStorage = getMvPartitionBusy(partitionId);
- if (partitionStorage == null) {
- throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
- }
+ if (partitionStorage == null) {
+ throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+ }
- return storages.getOrCreateStorage(partitionStorage);
+ return storages.getOrCreateStorage(partitionStorage);
+ });
}
private SortedIndex createSortedIndex(UUID indexId) {
@@ -479,45 +492,49 @@ public class RocksDbTableStorage implements MvTableStorage {
@Override
public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) {
- HashIndex storages = hashIndices.computeIfAbsent(indexId, id -> {
- var indexDescriptor = new HashIndexDescriptor(indexId, tablesCfg.value());
+ return inBusyLock(busyLock, () -> {
+ HashIndex storages = hashIndices.computeIfAbsent(indexId, id -> {
+ var indexDescriptor = new HashIndexDescriptor(indexId, tablesCfg.value());
- return new HashIndex(hashIndexCf, indexDescriptor);
- });
+ return new HashIndex(hashIndexCf, indexDescriptor);
+ });
- RocksDbMvPartitionStorage partitionStorage = getMvPartition(partitionId);
+ RocksDbMvPartitionStorage partitionStorage = getMvPartitionBusy(partitionId);
- if (partitionStorage == null) {
- throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
- }
+ if (partitionStorage == null) {
+ throw new StorageException(String.format("Partition ID %d does not exist", partitionId));
+ }
- return storages.getOrCreateStorage(partitionStorage);
+ return storages.getOrCreateStorage(partitionStorage);
+ });
}
@Override
public CompletableFuture<Void> destroyIndex(UUID indexId) {
- HashIndex hashIdx = hashIndices.remove(indexId);
+ return inBusyLock(busyLock, () -> {
+ HashIndex hashIdx = hashIndices.remove(indexId);
- if (hashIdx != null) {
- hashIdx.destroy();
- }
+ if (hashIdx != null) {
+ hashIdx.destroy();
+ }
- // Sorted Indexes have a separate Column Family per index, so we simply destroy it immediately after a flush completes
- // in order to avoid concurrent access to the CF.
- SortedIndex sortedIdx = sortedIndices.remove(indexId);
+ // Sorted Indexes have a separate Column Family per index, so we simply destroy it immediately after a flush completes
+ // in order to avoid concurrent access to the CF.
+ SortedIndex sortedIdx = sortedIndices.remove(indexId);
- if (sortedIdx != null) {
- // Remove the to-be destroyed CF from the flusher
- flusher.removeColumnFamily(sortedIdx.indexCf().handle());
+ if (sortedIdx != null) {
+ // Remove the to-be destroyed CF from the flusher
+ flusher.removeColumnFamily(sortedIdx.indexCf().handle());
- sortedIdx.destroy();
- }
+ sortedIdx.destroy();
+ }
- if (hashIdx == null) {
- return completedFuture(null);
- } else {
- return awaitFlush(false);
- }
+ if (hashIdx == null) {
+ return completedFuture(null);
+ } else {
+ return awaitFlush(false);
+ }
+ });
}
@Override
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index ce14828c2c..cb43a95e52 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.apache.ignite.internal.rocksdb.BusyRocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -117,93 +118,89 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
-
- byte[] rangeStart = rocksPrefix(key);
-
- byte[] rangeEnd = incrementArray(rangeStart);
+ return busy(() -> {
+ byte[] rangeStart = rocksPrefix(key);
- Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
+ byte[] rangeEnd = incrementArray(rangeStart);
- ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound);
+ Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
- RocksIterator it = indexCf.newIterator(options);
+ ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound);
- it.seek(rangeStart);
+ RocksIterator it = indexCf.newIterator(options);
- busyLock.leaveBusy();
+ it.seek(rangeStart);
- return new BusyRocksIteratorAdapter<>(busyLock, it) {
- @Override
- protected void handleBusyFail() {
- throw new StorageClosedException();
- }
+ return new BusyRocksIteratorAdapter<RowId>(busyLock, it) {
+ @Override
+ protected void handleBusyFail() {
+ throw new StorageClosedException();
+ }
- @Override
- protected RowId decodeEntry(byte[] key, byte[] value) {
- // RowId UUID is located at the last 16 bytes of the key
- long mostSignificantBits = bytesToLong(key, key.length - Long.BYTES * 2);
- long leastSignificantBits = bytesToLong(key, key.length - Long.BYTES);
+ @Override
+ protected RowId decodeEntry(byte[] key, byte[] value) {
+ // RowId UUID is located at the last 16 bytes of the key
+ long mostSignificantBits = bytesToLong(key, key.length - Long.BYTES * 2);
+ long leastSignificantBits = bytesToLong(key, key.length - Long.BYTES);
- return new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
- }
+ return new RowId(partitionStorage.partitionId(), mostSignificantBits, leastSignificantBits);
+ }
- @Override
- public void close() {
- super.close();
+ @Override
+ public void close() {
+ super.close();
- RocksUtils.closeAll(options, upperBound);
- }
- };
+ RocksUtils.closeAll(options, upperBound);
+ }
+ };
+ });
}
@Override
public void put(IndexRow row) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
+ busy(() -> {
+ try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
- try {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
- writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
- } catch (RocksDBException e) {
- throw new StorageException("Unable to insert data into hash index. Index ID: " + descriptor.id(), e);
- } finally {
- busyLock.leaveBusy();
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to insert data into hash index. Index ID: " + descriptor.id(), e);
+ }
+ });
}
@Override
public void remove(IndexRow row) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
+ busy(() -> {
+ try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
- try {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ writeBatch.delete(indexCf.handle(), rocksKey(row));
- writeBatch.delete(indexCf.handle(), rocksKey(row));
- } catch (RocksDBException e) {
- throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
- } finally {
- busyLock.leaveBusy();
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
+ }
+ });
}
@Override
public void destroy() {
- byte[] rangeEnd = incrementArray(constantPrefix);
+ busy(() -> {
+ byte[] rangeEnd = incrementArray(constantPrefix);
- assert rangeEnd != null;
+ assert rangeEnd != null;
- try (WriteOptions writeOptions = new WriteOptions().setDisableWAL(true)) {
- indexCf.db().deleteRange(indexCf.handle(), writeOptions, constantPrefix, rangeEnd);
- } catch (RocksDBException e) {
- throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
- }
+ try (WriteOptions writeOptions = new WriteOptions().setDisableWAL(true)) {
+ indexCf.db().deleteRange(indexCf.handle(), writeOptions, constantPrefix, rangeEnd);
+
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
+ }
+ });
}
private byte[] rocksPrefix(BinaryTuple prefix) {
@@ -253,4 +250,16 @@ public class RocksDbHashIndexStorage implements HashIndexStorage {
writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd);
}
+
+ private <V> V busy(Supplier<V> supplier) {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
+ try {
+ return supplier.get();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 99fc1b84dc..a66568f2cd 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -26,6 +26,7 @@ import java.nio.ByteOrder;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -103,67 +104,51 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) throws StorageException {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
-
- BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
+ return busy(() -> {
+ BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
- try {
return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
@Override
public void put(IndexRow row) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
+ busy(() -> {
+ try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
- try {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
- writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
- } catch (RocksDBException e) {
- throw new StorageException("Unable to insert data into sorted index. Index ID: " + descriptor.id(), e);
- } finally {
- busyLock.leaveBusy();
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to insert data into sorted index. Index ID: " + descriptor.id(), e);
+ }
+ });
}
@Override
public void remove(IndexRow row) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
+ busy(() -> {
+ try {
+ WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
- try {
- WriteBatchWithIndex writeBatch = partitionStorage.currentWriteBatch();
+ writeBatch.delete(indexCf.handle(), rocksKey(row));
- writeBatch.delete(indexCf.handle(), rocksKey(row));
- } catch (RocksDBException e) {
- throw new StorageException("Unable to remove data from sorted index. Index ID: " + descriptor.id(), e);
- } finally {
- busyLock.leaveBusy();
- }
+ return null;
+ } catch (RocksDBException e) {
+ throw new StorageException("Unable to remove data from sorted index. Index ID: " + descriptor.id(), e);
+ }
+ });
}
@Override
public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
+ return busy(() -> {
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
- boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
- boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
-
- try {
return scan(lowerBound, upperBound, includeLower, includeUpper, this::decodeRow);
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
private <T> PeekCursor<T> scan(
@@ -230,26 +215,16 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
public boolean hasNext() {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
-
- try {
+ return busy(() -> {
advanceIfNeeded();
return hasNext;
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
@Override
public T next() {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
-
- try {
+ return busy(() -> {
advanceIfNeeded();
boolean hasNext = this.hasNext;
@@ -261,18 +236,12 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
this.hasNext = null;
return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
@Override
public @Nullable T peek() {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
-
- try {
+ return busy(() -> {
if (hasNext != null) {
if (hasNext) {
return mapper.apply(ByteBuffer.wrap(key).order(ORDER));
@@ -290,9 +259,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
} else {
return mapper.apply(ByteBuffer.wrap(it.key()).order(ORDER));
}
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
private void advanceIfNeeded() throws StorageException {
@@ -417,4 +384,16 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
writeBatch.deleteRange(indexCf.handle(), constantPrefix, rangeEnd);
}
+
+ private <V> V busy(Supplier<V> supplier) {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
+ try {
+ return supplier.get();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
}