You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/08/16 12:18:54 UTC
[ignite-3] branch main updated: IGNITE-17532 Removed explicit flush after partition deletion in RockDB based partition storage. (#1011)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 32b1bae2c0 IGNITE-17532 Removed explicit flush after partition deletion in RockDB based partition storage. (#1011)
32b1bae2c0 is described below
commit 32b1bae2c0ea3bc41ce8cf791397197e43ca41ff
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Aug 16 15:18:49 2022 +0300
IGNITE-17532 Removed explicit flush after partition deletion in RockDB based partition storage. (#1011)
---
.../storage/rocksdb/RocksDbFlushListener.java | 8 +-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 39 +---------
.../storage/rocksdb/RocksDbStorageEngine.java | 9 +--
.../storage/rocksdb/RocksDbTableStorage.java | 90 +++++++++++++++++++---
.../storage/rocksdb/RocksDbTableStorageTest.java | 28 +++----
5 files changed, 106 insertions(+), 68 deletions(-)
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
index 92b1b41bdb..69d13cad81 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
@@ -21,6 +21,7 @@ import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BE
import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -70,9 +71,14 @@ class RocksDbFlushListener extends AbstractEventListener {
/** {@inheritDoc} */
@Override
public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
+ ExecutorService threadPool = tableStorage.engine().threadPool();
+
if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
- lastFlushProcessed = CompletableFuture.runAsync(this::refreshPersistedIndexes, tableStorage.engine().threadPool());
+ lastFlushProcessed = CompletableFuture.runAsync(this::refreshPersistedIndexes, threadPool);
}
+
+ // Do it for every column family, there's no way to tell in advance which one has the latest sequence number.
+ lastFlushProcessed.whenCompleteAsync((o, throwable) -> tableStorage.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
}
private void refreshPersistedIndexes() {
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index f9edc5cc28..33160679ce 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -27,14 +27,9 @@ import static org.rocksdb.ReadTier.PERSISTED_TIER;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -146,9 +141,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
private volatile long persistedIndex;
- /** Map with flush futures by applied index at the time of the {@link #flush()} call. */
- private final ConcurrentMap<Long, CompletableFuture<Void>> flushFuturesByAppliedIndex = new ConcurrentHashMap<>();
-
/**
* Constructor.
*
@@ -202,13 +194,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> flush() {
- CompletableFuture<Void> flushFuture = flushFuturesByAppliedIndex.computeIfAbsent(
- lastAppliedIndex, index -> new CompletableFuture<>()
- );
-
- tableStorage.scheduleFlush();
-
- return flushFuture;
+ return tableStorage.awaitFlush(true);
}
/** {@inheritDoc} */
@@ -241,27 +227,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
* Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding memtable, and sets it as a new value of
* {@link #persistedIndex()}.
*
- * <p/>All futures returned by {@link #flush()} are completed here if they correspond to the value of {@link #persistedIndex()}
- * (if flush was called before data started being flushed to the storage).
- *
* @throws StorageException If failed to read index from the storage.
*/
public void refreshPersistedIndex() throws StorageException {
- long persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
-
- this.persistedIndex = persistedIndex;
-
- Set<Entry<Long, CompletableFuture<Void>>> entries = flushFuturesByAppliedIndex.entrySet();
-
- for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator = entries.iterator(); iterator.hasNext(); ) {
- Entry<Long, CompletableFuture<Void>> entry = iterator.next();
-
- if (persistedIndex >= entry.getKey()) {
- entry.getValue().complete(null);
-
- iterator.remove();
- }
- }
+ persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
}
/**
@@ -792,10 +761,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
/** {@inheritDoc} */
@Override
public void close() throws Exception {
- for (CompletableFuture<Void> future : flushFuturesByAppliedIndex.values()) {
- future.cancel(false);
- }
-
IgniteUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts, upperBound);
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 05192a7f24..de9ecdb2e0 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -34,9 +34,7 @@ import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -148,7 +146,7 @@ public class RocksDbStorageEngine implements StorageEngine {
/** {@inheritDoc} */
@Override
- public TableStorage createTable(TableConfiguration tableCfg) throws StorageException {
+ public RocksDbTableStorage createTable(TableConfiguration tableCfg) throws StorageException {
TableView tableView = tableCfg.value();
assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
@@ -168,8 +166,9 @@ public class RocksDbStorageEngine implements StorageEngine {
return new RocksDbTableStorage(this, tablePath, tableCfg, dataRegion);
}
+ /** {@inheritDoc} */
@Override
- public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
- return (MvTableStorage) createTable(tableCfg);
+ public RocksDbTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
+ return createTable(tableCfg);
}
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 5e2d16df15..d7eb52d5d0 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -27,8 +27,13 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -97,6 +102,15 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
/** Partition storages. */
private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> partitions;
+ /** Map with flush futures by sequence number at the time of the {@link #awaitFlush(boolean)} call. */
+ private final ConcurrentMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentHashMap<>();
+
+ /** Latest known sequence number for persisted data. Not volatile, protected by explicit synchronization. */
+ private long latestPersistedSequenceNumber;
+
+ /** Mutex for {@link #latestPersistedSequenceNumber} modifications. */
+ private final Object latestPersistedSequenceNumberMux = new Object();
+
/**
* Instance of the latest scheduled flush closure.
*
@@ -104,10 +118,6 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
*/
private volatile Runnable latestFlushClosure;
- /** Flag indicating if the storage has been stopped. */
- @Deprecated
- private volatile boolean stopped = false;
-
//TODO Use it instead of the "stopped" flag.
/** Busy lock to stop synchronously. */
final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -212,6 +222,11 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
throw new StorageException("Unidentified column family [name=" + cf.name() + ", table=" + tableCfg.name() + ']');
}
}
+
+ // Pointless synchronization, but without it there would be a warning in the code.
+ synchronized (latestPersistedSequenceNumberMux) {
+ latestPersistedSequenceNumber = db.getLatestSequenceNumber();
+ }
} catch (RocksDBException e) {
throw new StorageException("Failed to initialize RocksDB instance", e);
}
@@ -223,10 +238,63 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
}
}
+ /**
+ * Returns a future to wait next flush operation from the current point in time. Uses {@link RocksDB#getLatestSequenceNumber()} to
+ * achieve this.
+ *
+ * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggerred in the near future.
+ *
+ * @see #scheduleFlush()
+ */
+ public CompletableFuture<Void> awaitFlush(boolean schedule) {
+ CompletableFuture<Void> future;
+
+ long dbSequenceNumber = db.getLatestSequenceNumber();
+
+ synchronized (latestPersistedSequenceNumberMux) {
+ if (dbSequenceNumber <= latestPersistedSequenceNumber) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ future = flushFuturesBySequenceNumber.computeIfAbsent(dbSequenceNumber, l -> new CompletableFuture<>());
+ }
+
+ if (schedule) {
+ scheduleFlush();
+ }
+
+ return future;
+ }
+
+ /**
+ * Completes all futures in {@link #flushFuturesBySequenceNumber} up to a given sequence number.
+ */
+ void completeFutures(long sequenceNumber) {
+ synchronized (latestPersistedSequenceNumberMux) {
+ if (sequenceNumber <= latestPersistedSequenceNumber) {
+ return;
+ }
+
+ latestPersistedSequenceNumber = sequenceNumber;
+ }
+
+ Set<Entry<Long, CompletableFuture<Void>>> entries = flushFuturesBySequenceNumber.entrySet();
+
+ for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator = entries.iterator(); iterator.hasNext(); ) {
+ Entry<Long, CompletableFuture<Void>> entry = iterator.next();
+
+ if (sequenceNumber >= entry.getKey()) {
+ entry.getValue().complete(null);
+
+ iterator.remove();
+ }
+ }
+ }
+
/**
* Schedules a flush of the table. If run several times within a small amount of time, only the last scheduled flush will be executed.
*/
- public void scheduleFlush() {
+ void scheduleFlush() {
Runnable newClosure = new Runnable() {
@Override
public void run() {
@@ -235,7 +303,9 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
}
try {
- db.flush(flushOptions);
+ // Explicit list of CF handles is mandatory!
+ // Default flush is buggy and only invokes listener methods for a single random CF.
+ db.flush(flushOptions, List.of(metaCfHandle(), partitionCfHandle()));
} catch (RocksDBException e) {
LOG.error("Error occurred during the explicit flush for table '{}'", e, tableCfg.name());
}
@@ -253,14 +323,16 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
/** {@inheritDoc} */
@Override
public void stop() throws StorageException {
- stopped = true;
-
if (!stopGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
+ for (CompletableFuture<Void> future : flushFuturesBySequenceNumber.values()) {
+ future.cancel(false);
+ }
+
List<AutoCloseable> resources = new ArrayList<>();
resources.add(db);
@@ -351,7 +423,7 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
mvPartition.destroy();
// Wait for the data to actually be removed from the disk and close the storage.
- return mvPartition.flush()
+ return awaitFlush(false)
.whenComplete((v, e) -> {
partitions.set(partitionId, null);
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
index ddf3b6c18b..c21860f8b2 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
@@ -47,8 +47,6 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageChange;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -70,17 +68,19 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
public class RocksDbTableStorageTest extends BaseMvStoragesTest {
- private StorageEngine engine;
+ private RocksDbStorageEngine engine;
- private MvTableStorage storage;
+ private RocksDbTableStorage storage;
@BeforeEach
public void setUp(
@WorkDirectory Path workDir,
- @InjectConfiguration RocksDbStorageEngineConfiguration rocksDbEngineConfig,
+ @InjectConfiguration(
+ value = "mock {flushDelayMillis = 0, defaultRegion {size = 16536, writeBufferSize = 16536}}"
+ ) RocksDbStorageEngineConfiguration rocksDbEngineConfig,
@InjectConfiguration(
name = "table",
- value = "mock.partitions = 1024",
+ value = "mock.partitions = 512",
polymorphicExtensions = {
HashIndexConfigurationSchema.class,
UnknownDataStorageConfigurationSchema.class,
@@ -97,15 +97,6 @@ public class RocksDbTableStorageTest extends BaseMvStoragesTest {
assertThat(((RocksDbDataStorageView) tableCfg.dataStorage().value()).dataRegion(), equalTo(DEFAULT_DATA_REGION_NAME));
- CompletableFuture<Void> changeEngineFuture = rocksDbEngineConfig.defaultRegion()
- .change(c -> c.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
-
- assertThat(changeEngineFuture, willBe(nullValue(Void.class)));
-
- changeEngineFuture = tableCfg.change(cfg -> cfg.changePartitions(512));
-
- assertThat(changeEngineFuture, willBe(nullValue(Void.class)));
-
engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
engine.start();
@@ -205,7 +196,12 @@ public class RocksDbTableStorageTest extends BaseMvStoragesTest {
RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData, txId));
- assertThat(storage.destroyPartition(42), willCompleteSuccessfully());
+ CompletableFuture<Void> destroyFuture = storage.destroyPartition(42);
+
+ // Partition desctuction doesn't enforce flush.
+ storage.scheduleFlush();
+
+ assertThat(destroyFuture, willCompleteSuccessfully());
assertThat(storage.getMvPartition(42), is(nullValue()));
assertThat(storage.getOrCreateMvPartition(42).read(rowId0, txId), is(nullValue()));