You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2020/10/21 15:33:30 UTC

[asterixdb] branch master updated: [ASTERIXDB-2786] Fix synchronization of GVBC

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0767d21  [ASTERIXDB-2786] Fix synchronization of GVBC
0767d21 is described below

commit 0767d212ee2325c678687974c5bf32ac0039ffde
Author: luochen <cl...@uci.edu>
AuthorDate: Mon Oct 12 18:55:47 2020 -0700

    [ASTERIXDB-2786] Fix synchronization of GVBC
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - fix synchronization of GVBC's register and unregister
    - fix the access of disk components in GVBCTest
    - fix the error message printing to show the full stack
    trace
    - refactor GVBC to avoid checking primary indexes that are being
    flushed
    
    Change-Id: I97aba2139f013610649f7d6ec48fed5eca86a4de
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8384
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../dataflow/GlobalVirtualBufferCacheTest.java     | 20 ++++---
 .../common/context/GlobalVirtualBufferCache.java   | 68 +++++++++++-----------
 2 files changed, 44 insertions(+), 44 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
index 2034aa8..abfe2ec 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -58,6 +58,7 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -153,21 +154,22 @@ public class GlobalVirtualBufferCacheTest {
                 Assert.fail();
             }
             for (int i = 0; i < NUM_PARTITIONS; i++) {
-                Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
-                Assert.assertTrue(
-                        primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
-                                .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
+                List<ILSMDiskComponent> diskComponents = new ArrayList<>(primaryIndexes[i].getDiskComponents());
+                Assert.assertFalse(diskComponents.isEmpty());
+                Assert.assertTrue(diskComponents.stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
+                        .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
 
-                Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
-                Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
-                        .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
-                                .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
+                List<ILSMDiskComponent> filteredDiskComponents =
+                        new ArrayList<>(filteredPrimaryIndexes[i].getDiskComponents());
+                Assert.assertFalse(filteredDiskComponents.isEmpty());
+                Assert.assertTrue(filteredDiskComponents.stream().allMatch(c -> ((AbstractTreeIndex) c.getIndex())
+                        .getFileReference().getFile().length() <= FILTERED_MEMORY_COMPONENT_SIZE));
             }
 
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
         } catch (Throwable e) {
-            LOGGER.error(e);
+            LOGGER.error("testFlushes failed", e);
             Assert.fail(e.getMessage());
         }
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index d8c76ab..f772038 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -107,7 +107,7 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
     public void register(ILSMMemoryComponent memoryComponent) {
         ILSMIndex index = memoryComponent.getLsmIndex();
         if (index.isPrimaryIndex()) {
-            synchronized (primaryIndexes) {
+            synchronized (this) {
                 if (!primaryIndexes.contains(index)) {
                     // make sure only add index once
                     primaryIndexes.add(index);
@@ -134,7 +134,7 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
     public void unregister(ILSMMemoryComponent memoryComponent) {
         ILSMIndex index = memoryComponent.getLsmIndex();
         if (index.isPrimaryIndex()) {
-            synchronized (primaryIndexes) {
+            synchronized (this) {
                 int pos = primaryIndexes.indexOf(index);
                 if (pos >= 0) {
                     primaryIndexes.remove(index);
@@ -485,44 +485,42 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
             int cycles = 0;
             while (vbc.getUsage() >= flushPageBudget && cycles <= primaryIndexes.size()) {
                 // find the first modified memory component while avoiding infinite loops
-                while (cycles <= primaryIndexes.size()
-                        && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
-                    flushPtr = (flushPtr + 1) % primaryIndexes.size();
-                    cycles++;
-                }
-
                 ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
                 flushPtr = (flushPtr + 1) % primaryIndexes.size();
-                // we need to manually flush this memory component because it may be idle at this point
-                // note that this is different from flushing a filtered memory component
-                PrimaryIndexOperationTracker opTracker =
-                        (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
-                synchronized (opTracker) {
-                    boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
-                    if (flushable && !opTracker.isFlushLogCreated()) {
-                        // if the flush log has already been created, then we can simply wait for
-                        // that flush to complete
-                        ILSMMemoryComponent memoryComponent = primaryIndex.getCurrentMemoryComponent();
-                        if (memoryComponent.getState() == ComponentState.READABLE_WRITABLE) {
-                            // before we schedule the flush, mark the memory component as unwritable to prevent
-                            // future writers
-                            memoryComponent.setUnwritable();
+                cycles++;
+                if (!primaryIndex.isCurrentMutableComponentEmpty() && !flushingIndexes.contains(primaryIndex)) {
+                    // we need to manually flush this memory component because it may be idle at this point
+                    // note that this is different from flushing a filtered memory component
+                    PrimaryIndexOperationTracker opTracker =
+                            (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
+                    synchronized (opTracker) {
+                        boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
+                        if (flushable && !opTracker.isFlushLogCreated()) {
+                            // if the flush log has already been created, then we can simply wait for
+                            // that flush to complete
+                            ILSMMemoryComponent memoryComponent = primaryIndex.getCurrentMemoryComponent();
+                            if (memoryComponent.getState() == ComponentState.READABLE_WRITABLE) {
+                                // before we schedule the flush, mark the memory component as unwritable to prevent
+                                // future writers
+                                memoryComponent.setUnwritable();
+                            }
+
+                            opTracker.setFlushOnExit(true);
+                            opTracker.flushIfNeeded();
+                            // If the flush cannot be scheduled at this time, then there must be active writers.
+                            // The flush will be eventually scheduled when writers exit
+                            if (LOGGER.isInfoEnabled()) {
+                                LOGGER.info("Requested flushing {} index {}",
+                                        isMetadataIndex(primaryIndex) ? "metadata" : "primary",
+                                        primaryIndex.toString());
+                            }
                         }
-
-                        opTracker.setFlushOnExit(true);
-                        opTracker.flushIfNeeded();
-                        // If the flush cannot be scheduled at this time, then there must be active writers.
-                        // The flush will be eventually scheduled when writers exit
-                        if (LOGGER.isInfoEnabled()) {
-                            LOGGER.info("Requested flushing {} index {}",
-                                    isMetadataIndex(primaryIndex) ? "metadata" : "primary", primaryIndex.toString());
+                        if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
+                            // global vbc cannot wait on metadata indexes because metadata indexes support full
+                            // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
+                            return primaryIndex;
                         }
                     }
-                    if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
-                        // global vbc cannot wait on metadata indexes because metadata indexes support full
-                        // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
-                        return primaryIndex;
-                    }
                 }
             }
             return null;