You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/08/16 12:18:54 UTC

[ignite-3] branch main updated: IGNITE-17532 Removed explicit flush after partition deletion in RockDB based partition storage. (#1011)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 32b1bae2c0 IGNITE-17532 Removed explicit flush after partition deletion in RockDB based partition storage. (#1011)
32b1bae2c0 is described below

commit 32b1bae2c0ea3bc41ce8cf791397197e43ca41ff
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Aug 16 15:18:49 2022 +0300

    IGNITE-17532 Removed explicit flush after partition deletion in RockDB based partition storage. (#1011)
---
 .../storage/rocksdb/RocksDbFlushListener.java      |  8 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 39 +---------
 .../storage/rocksdb/RocksDbStorageEngine.java      |  9 +--
 .../storage/rocksdb/RocksDbTableStorage.java       | 90 +++++++++++++++++++---
 .../storage/rocksdb/RocksDbTableStorageTest.java   | 28 +++----
 5 files changed, 106 insertions(+), 68 deletions(-)

diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
index 92b1b41bdb..69d13cad81 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbFlushListener.java
@@ -21,6 +21,7 @@ import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BE
 import static org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -70,9 +71,14 @@ class RocksDbFlushListener extends AbstractEventListener {
     /** {@inheritDoc} */
     @Override
     public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
+        ExecutorService threadPool = tableStorage.engine().threadPool();
+
         if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
-            lastFlushProcessed = CompletableFuture.runAsync(this::refreshPersistedIndexes, tableStorage.engine().threadPool());
+            lastFlushProcessed = CompletableFuture.runAsync(this::refreshPersistedIndexes, threadPool);
         }
+
+        // Do it for every column family, there's no way to tell in advance which one has the latest sequence number.
+        lastFlushProcessed.whenCompleteAsync((o, throwable) -> tableStorage.completeFutures(flushJobInfo.getLargestSeqno()), threadPool);
     }
 
     private void refreshPersistedIndexes() {
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index f9edc5cc28..33160679ce 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -27,14 +27,9 @@ import static org.rocksdb.ReadTier.PERSISTED_TIER;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -146,9 +141,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** The value of {@link #lastAppliedIndex} persisted to the device at this moment. */
     private volatile long persistedIndex;
 
-    /** Map with flush futures by applied index at the time of the {@link #flush()} call. */
-    private final ConcurrentMap<Long, CompletableFuture<Void>> flushFuturesByAppliedIndex = new ConcurrentHashMap<>();
-
     /**
      * Constructor.
      *
@@ -202,13 +194,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> flush() {
-        CompletableFuture<Void> flushFuture = flushFuturesByAppliedIndex.computeIfAbsent(
-                lastAppliedIndex, index -> new CompletableFuture<>()
-        );
-
-        tableStorage.scheduleFlush();
-
-        return flushFuture;
+        return tableStorage.awaitFlush(true);
     }
 
     /** {@inheritDoc} */
@@ -241,27 +227,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
      * Reads a value of {@link #lastAppliedIndex()} from the storage, avoiding memtable, and sets it as a new value of
      * {@link #persistedIndex()}.
      *
-     * <p/>All futures returned by {@link #flush()} are completed here if they correspond to the value of {@link #persistedIndex()}
-     * (if flush was called before data started being flushed to the storage).
-     *
      * @throws StorageException If failed to read index from the storage.
      */
     public void refreshPersistedIndex() throws StorageException {
-        long persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
-
-        this.persistedIndex = persistedIndex;
-
-        Set<Entry<Long, CompletableFuture<Void>>> entries = flushFuturesByAppliedIndex.entrySet();
-
-        for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator = entries.iterator(); iterator.hasNext(); ) {
-            Entry<Long, CompletableFuture<Void>> entry = iterator.next();
-
-            if (persistedIndex >= entry.getKey()) {
-                entry.getValue().complete(null);
-
-                iterator.remove();
-            }
-        }
+        persistedIndex = readLastAppliedIndex(persistedTierReadOpts);
     }
 
     /**
@@ -792,10 +761,6 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage {
     /** {@inheritDoc} */
     @Override
     public void close() throws Exception {
-        for (CompletableFuture<Void> future : flushFuturesByAppliedIndex.values()) {
-            future.cancel(false);
-        }
-
         IgniteUtils.closeAll(persistedTierReadOpts, readOpts, writeOpts, upperBound);
     }
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 05192a7f24..de9ecdb2e0 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -34,9 +34,7 @@ import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -148,7 +146,7 @@ public class RocksDbStorageEngine implements StorageEngine {
 
     /** {@inheritDoc} */
     @Override
-    public TableStorage createTable(TableConfiguration tableCfg) throws StorageException {
+    public RocksDbTableStorage createTable(TableConfiguration tableCfg) throws StorageException {
         TableView tableView = tableCfg.value();
 
         assert tableView.dataStorage().name().equals(ENGINE_NAME) : tableView.dataStorage().name();
@@ -168,8 +166,9 @@ public class RocksDbStorageEngine implements StorageEngine {
         return new RocksDbTableStorage(this, tablePath, tableCfg, dataRegion);
     }
 
+    /** {@inheritDoc} */
     @Override
-    public MvTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
-        return (MvTableStorage) createTable(tableCfg);
+    public RocksDbTableStorage createMvTable(TableConfiguration tableCfg) throws StorageException {
+        return createTable(tableCfg);
     }
 }
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 5e2d16df15..d7eb52d5d0 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -27,8 +27,13 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -97,6 +102,15 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
     /** Partition storages. */
     private volatile AtomicReferenceArray<RocksDbMvPartitionStorage> partitions;
 
+    /** Map with flush futures by sequence number at the time of the {@link #awaitFlush(boolean)} call. */
+    private final ConcurrentMap<Long, CompletableFuture<Void>> flushFuturesBySequenceNumber = new ConcurrentHashMap<>();
+
+    /** Latest known sequence number for persisted data. Not volatile, protected by explicit synchronization. */
+    private long latestPersistedSequenceNumber;
+
+    /** Mutex for {@link #latestPersistedSequenceNumber} modifications. */
+    private final Object latestPersistedSequenceNumberMux = new Object();
+
     /**
      * Instance of the latest scheduled flush closure.
      *
@@ -104,10 +118,6 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
      */
     private volatile Runnable latestFlushClosure;
 
-    /** Flag indicating if the storage has been stopped. */
-    @Deprecated
-    private volatile boolean stopped = false;
-
     //TODO Use it instead of the "stopped" flag.
     /** Busy lock to stop synchronously. */
     final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -212,6 +222,11 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
                         throw new StorageException("Unidentified column family [name=" + cf.name() + ", table=" + tableCfg.name() + ']');
                 }
             }
+
+            // Pointless synchronization, but without it there would be a warning in the code.
+            synchronized (latestPersistedSequenceNumberMux) {
+                latestPersistedSequenceNumber = db.getLatestSequenceNumber();
+            }
         } catch (RocksDBException e) {
             throw new StorageException("Failed to initialize RocksDB instance", e);
         }
@@ -223,10 +238,63 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
         }
     }
 
+    /**
+     * Returns a future to wait next flush operation from the current point in time. Uses {@link RocksDB#getLatestSequenceNumber()} to
+     * achieve this.
+     *
+     * @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)} should be explicitly triggerred in the near future.
+     *
+     * @see #scheduleFlush()
+     */
+    public CompletableFuture<Void> awaitFlush(boolean schedule) {
+        CompletableFuture<Void> future;
+
+        long dbSequenceNumber = db.getLatestSequenceNumber();
+
+        synchronized (latestPersistedSequenceNumberMux) {
+            if (dbSequenceNumber <= latestPersistedSequenceNumber) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            future = flushFuturesBySequenceNumber.computeIfAbsent(dbSequenceNumber, l -> new CompletableFuture<>());
+        }
+
+        if (schedule) {
+            scheduleFlush();
+        }
+
+        return future;
+    }
+
+    /**
+     * Completes all futures in {@link #flushFuturesBySequenceNumber} up to a given sequence number.
+     */
+    void completeFutures(long sequenceNumber) {
+        synchronized (latestPersistedSequenceNumberMux) {
+            if (sequenceNumber <= latestPersistedSequenceNumber) {
+                return;
+            }
+
+            latestPersistedSequenceNumber = sequenceNumber;
+        }
+
+        Set<Entry<Long, CompletableFuture<Void>>> entries = flushFuturesBySequenceNumber.entrySet();
+
+        for (Iterator<Entry<Long, CompletableFuture<Void>>> iterator = entries.iterator(); iterator.hasNext(); ) {
+            Entry<Long, CompletableFuture<Void>> entry = iterator.next();
+
+            if (sequenceNumber >= entry.getKey()) {
+                entry.getValue().complete(null);
+
+                iterator.remove();
+            }
+        }
+    }
+
     /**
      * Schedules a flush of the table. If run several times within a small amount of time, only the last scheduled flush will be executed.
      */
-    public void scheduleFlush() {
+    void scheduleFlush() {
         Runnable newClosure = new Runnable() {
             @Override
             public void run() {
@@ -235,7 +303,9 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
                 }
 
                 try {
-                    db.flush(flushOptions);
+                    // Explicit list of CF handles is mandatory!
+                    // Default flush is buggy and only invokes listener methods for a single random CF.
+                    db.flush(flushOptions, List.of(metaCfHandle(), partitionCfHandle()));
                 } catch (RocksDBException e) {
                     LOG.error("Error occurred during the explicit flush for table '{}'", e, tableCfg.name());
                 }
@@ -253,14 +323,16 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
     /** {@inheritDoc} */
     @Override
     public void stop() throws StorageException {
-        stopped = true;
-
         if (!stopGuard.compareAndSet(false, true)) {
             return;
         }
 
         busyLock.block();
 
+        for (CompletableFuture<Void> future : flushFuturesBySequenceNumber.values()) {
+            future.cancel(false);
+        }
+
         List<AutoCloseable> resources = new ArrayList<>();
 
         resources.add(db);
@@ -351,7 +423,7 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
         mvPartition.destroy();
 
         // Wait for the data to actually be removed from the disk and close the storage.
-        return mvPartition.flush()
+        return awaitFlush(false)
                 .whenComplete((v, e) -> {
                     partitions.set(partitionId, null);
 
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
index ddf3b6c18b..c21860f8b2 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
@@ -47,8 +47,6 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.BaseMvStoragesTest;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.engine.StorageEngine;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageChange;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
 import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
@@ -70,17 +68,19 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ConfigurationExtension.class)
 public class RocksDbTableStorageTest extends BaseMvStoragesTest {
-    private StorageEngine engine;
+    private RocksDbStorageEngine engine;
 
-    private MvTableStorage storage;
+    private RocksDbTableStorage storage;
 
     @BeforeEach
     public void setUp(
             @WorkDirectory Path workDir,
-            @InjectConfiguration RocksDbStorageEngineConfiguration rocksDbEngineConfig,
+            @InjectConfiguration(
+                    value = "mock {flushDelayMillis = 0, defaultRegion {size = 16536, writeBufferSize = 16536}}"
+            ) RocksDbStorageEngineConfiguration rocksDbEngineConfig,
             @InjectConfiguration(
                     name = "table",
-                    value = "mock.partitions = 1024",
+                    value = "mock.partitions = 512",
                     polymorphicExtensions = {
                             HashIndexConfigurationSchema.class,
                             UnknownDataStorageConfigurationSchema.class,
@@ -97,15 +97,6 @@ public class RocksDbTableStorageTest extends BaseMvStoragesTest {
 
         assertThat(((RocksDbDataStorageView) tableCfg.dataStorage().value()).dataRegion(), equalTo(DEFAULT_DATA_REGION_NAME));
 
-        CompletableFuture<Void> changeEngineFuture = rocksDbEngineConfig.defaultRegion()
-                .change(c -> c.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
-
-        assertThat(changeEngineFuture, willBe(nullValue(Void.class)));
-
-        changeEngineFuture = tableCfg.change(cfg -> cfg.changePartitions(512));
-
-        assertThat(changeEngineFuture, willBe(nullValue(Void.class)));
-
         engine = new RocksDbStorageEngine(rocksDbEngineConfig, workDir);
 
         engine.start();
@@ -205,7 +196,12 @@ public class RocksDbTableStorageTest extends BaseMvStoragesTest {
 
         RowId rowId1 = partitionStorage1.runConsistently(() -> partitionStorage1.insert(testData, txId));
 
-        assertThat(storage.destroyPartition(42), willCompleteSuccessfully());
+        CompletableFuture<Void> destroyFuture = storage.destroyPartition(42);
+
+        // Partition desctuction doesn't enforce flush.
+        storage.scheduleFlush();
+
+        assertThat(destroyFuture, willCompleteSuccessfully());
 
         assertThat(storage.getMvPartition(42), is(nullValue()));
         assertThat(storage.getOrCreateMvPartition(42).read(rowId0, txId), is(nullValue()));