You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2020/05/13 16:54:03 UTC

[asterixdb] branch master updated: [ASTERIXDB-2730][STO] Avoid Double Flushes in GVBC

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 199398a  [ASTERIXDB-2730][STO] Avoid Double Flushes in GVBC
199398a is described below

commit 199398a13f631a8fe848c833711ed09e49509c70
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Wed May 13 04:13:27 2020 +0300

    [ASTERIXDB-2730][STO] Avoid Double Flushes in GVBC
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Ensure GVBC only flushes a primary index if it has a modified
    memory component and there is no pending flush request.
    
    Change-Id: Ib4c3c632c43d83c5e60960c2cdcce54f1216b851
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6305
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../dataflow/GlobalVirtualBufferCacheTest.java     | 112 ++++++++++++---------
 .../common/context/GlobalVirtualBufferCache.java   |  43 ++++----
 2 files changed, 89 insertions(+), 66 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
index 0b07bc4..0e51f28 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -91,65 +91,85 @@ public class GlobalVirtualBufferCacheTest {
     private static final long FILTERED_MEMORY_COMPONENT_SIZE = 16 * 1024l;
 
     @BeforeClass
-    public static void setUp() throws Exception {
-        System.out.println("SetUp: ");
-        TestHelper.deleteExistingInstanceFiles();
-        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
-                + File.separator + "resources" + File.separator + "cc.conf";
-        nc = new TestNodeController(configPath, false);
+    public static void setUp() {
+        try {
+            System.out.println("SetUp: ");
+            TestHelper.deleteExistingInstanceFiles();
+            String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                    + File.separator + "resources" + File.separator + "cc.conf";
+            nc = new TestNodeController(configPath, false);
+        } catch (Throwable e) {
+            LOGGER.error(e);
+            Assert.fail(e.getMessage());
+        }
     }
 
     @Before
-    public void initializeTest() throws Exception {
+    public void initializeTest() {
         // initialize NC before each test
-        initializeNc();
-        initializeTestCtx();
-        createIndex();
-        readIndex();
-        tupleGenerator = StorageTestUtils.getTupleGenerator();
+        try {
+            initializeNc();
+            initializeTestCtx();
+            createIndex();
+            readIndex();
+            tupleGenerator = StorageTestUtils.getTupleGenerator();
+        } catch (Throwable e) {
+            LOGGER.error(e);
+            Assert.fail(e.getMessage());
+        }
     }
 
     @After
-    public void deinitializeTest() throws Exception {
-        dropIndex();
-        // cleanup after each test case
-        nc.deInit(true);
-        nc.clearOpts();
+    public void deinitializeTest() {
+        try {
+            dropIndex();
+            // cleanup after each test case
+            nc.deInit(true);
+            nc.clearOpts();
+        } catch (Throwable e) {
+            LOGGER.error(e);
+            Assert.fail(e.getMessage());
+        }
     }
 
     @Test
-    public void testFlushes() throws Exception {
-        List<Thread> threads = new ArrayList<>();
-        int records = 16 * 1024;
-        int threadsPerPartition = 2;
-        AtomicReference<Exception> exceptionRef = new AtomicReference<>();
-        for (int p = 0; p < NUM_PARTITIONS; p++) {
-            for (int t = 0; t < threadsPerPartition; t++) {
-                threads.add(insertRecords(records, p, false, exceptionRef));
-                threads.add(insertRecords(records, p, true, exceptionRef));
+    public void testFlushes() {
+        try {
+            List<Thread> threads = new ArrayList<>();
+            int records = 16 * 1024;
+            int threadsPerPartition = 2;
+            AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+            for (int p = 0; p < NUM_PARTITIONS; p++) {
+                for (int t = 0; t < threadsPerPartition; t++) {
+                    threads.add(insertRecords(records, p, false, exceptionRef));
+                    threads.add(insertRecords(records, p, true, exceptionRef));
+                }
             }
-        }
-        for (Thread thread : threads) {
-            thread.join();
-        }
-        if (exceptionRef.get() != null) {
-            exceptionRef.get().printStackTrace();
-            Assert.fail();
-        }
-        for (int i = 0; i < NUM_PARTITIONS; i++) {
-            Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
-            Assert.assertTrue(
-                    primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
-                            .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
+            for (Thread thread : threads) {
+                thread.join();
+            }
+            if (exceptionRef.get() != null) {
+                exceptionRef.get().printStackTrace();
+                Assert.fail();
+            }
+            for (int i = 0; i < NUM_PARTITIONS; i++) {
+                Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
+                Assert.assertTrue(
+                        primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
+                                .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
 
-            Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
-            Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
-                    .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
-                            .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
-        }
+                Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
+                Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
+                        .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
+                                .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
+            }
 
-        nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-        nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+        } catch (Throwable e) {
+            LOGGER.error(e);
+            Assert.fail(e.getMessage());
+        }
     }
 
     private void initializeNc() throws Exception {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index 5177b25..c0197b0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -452,33 +452,36 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
 
         private void scheduleFlush() throws HyracksDataException {
             synchronized (GlobalVirtualBufferCache.this) {
-                if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
-                    return;
-                }
                 int cycles = 0;
-                // find the first modified memory component while avoiding infinite loops
-                while (cycles <= primaryIndexes.size()
-                        && !primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
-                    flushPtr = (flushPtr + 1) % primaryIndexes.size();
-                    cycles++;
-                }
-                if (primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
-                    // flush the current memory component
-                    flushingIndex = primaryIndexes.get(flushPtr);
+                while (vbc.getUsage() >= flushPageBudget && flushingIndex == null && cycles <= primaryIndexes.size()) {
+                    // find the first modified memory component while avoiding infinite loops
+                    while (cycles <= primaryIndexes.size()
+                            && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
+                        flushPtr = (flushPtr + 1) % primaryIndexes.size();
+                        cycles++;
+                    }
+
+                    ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
                     flushPtr = (flushPtr + 1) % primaryIndexes.size();
                     // we need to manually flush this memory component because it may be idle at this point
                     // note that this is different from flushing a filtered memory component
                     PrimaryIndexOperationTracker opTracker =
-                            (PrimaryIndexOperationTracker) flushingIndex.getOperationTracker();
+                            (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
                     synchronized (opTracker) {
-                        opTracker.setFlushOnExit(true);
-                        opTracker.flushIfNeeded();
-                        // If the flush cannot be scheduled at this time, then there must be active writers.
-                        // The flush will be eventually scheduled when writers exit
+                        boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
+                        if (flushable && !opTracker.isFlushLogCreated()) {
+                            // if the flush log has already been created, then we can simply wait for
+                            // that flush to complete
+                            opTracker.setFlushOnExit(true);
+                            opTracker.flushIfNeeded();
+                            // If the flush cannot be scheduled at this time, then there must be active writers.
+                            // The flush will be eventually scheduled when writers exit
+                        }
+                        if (flushable || opTracker.isFlushLogCreated()) {
+                            flushingIndex = primaryIndex;
+                            break;
+                        }
                     }
-                } else {
-                    throw new IllegalStateException(
-                            "Cannot find modified memory component after checking all primary indexes");
                 }
             }
         }