You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2020/06/13 18:02:16 UTC

[asterixdb] branch master updated: [ASTERIXDB-2730][STO] Optimize flush in GVBC

This is an automated email from the ASF dual-hosted git repository.

luochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 64ae834  [ASTERIXDB-2730][STO] Optimize flush in GVBC
64ae834 is described below

commit 64ae834349ba31413179344699b9d5a7596f4f9a
Author: luochen <cl...@uci.edu>
AuthorDate: Fri Jun 12 10:04:31 2020 -0700

    [ASTERIXDB-2730][STO] Optimize flush in GVBC
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes. Introduce a new storage property:
    storage.memorycomponent.max.concurrent.flushes (default 0)
    
    Details:
    - Introduce a new storage property to allow concurrent flushes
    by GVBC. The default value is 0, which means that the flush concurency
    will be the same as the number of NC partitions.
    - Move cleaning up of a memory component out of the synchronization block
    on op tracker because this may take a relatively long time (a full scan over
    all GVBC pages).
    - Introduce a minor fix to make sure the memory component is unwritable
    before requesting flushing it by GVBC
    
    Change-Id: Id8867fa3ac65da319723b804cc1e39dc8eb6bde5
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6624
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  10 +-
 .../asterix/common/config/StorageProperties.java   |   9 +
 .../common/context/GlobalVirtualBufferCache.java   | 206 +++++++++++----------
 .../storage/am/lsm/common/api/ILSMComponent.java   |   4 +-
 .../storage/am/lsm/common/api/ILSMIndex.java       |   4 +
 .../am/lsm/common/api/ILSMMemoryComponent.java     |  10 +-
 .../lsm/common/impls/AbstractLSMDiskComponent.java |   3 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java      |  21 ++-
 .../common/impls/AbstractLSMMemoryComponent.java   |  26 ++-
 .../impls/AbstractLSMWithBuddyMemoryComponent.java |   4 +-
 .../am/lsm/common/impls/EmptyComponent.java        |   2 +-
 .../storage/am/lsm/common/impls/LSMHarness.java    |  57 +++---
 12 files changed, 212 insertions(+), 144 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 3cf57c5..e9da651 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -206,7 +206,15 @@ public class NCAppRuntimeContext implements INcApplicationContext {
             }
             localResourceRepository.deleteStorageData();
         }
-        virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties);
+        int maxConcurrentFlushes = storageProperties.getMaxConcurrentFlushes();
+        if (maxConcurrentFlushes <= 0) {
+            maxConcurrentFlushes = ioManager.getIODevices().size();
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("The value of maxConcurrentFlushes is not provided. Setting maxConcurrentFlushes = {}.",
+                        maxConcurrentFlushes);
+            }
+        }
+        virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties, maxConcurrentFlushes);
         // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
         // the metadata bootstrap task
         ((ILifeCycleComponent) virtualBufferCache).start();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 958d5ae..ed0bf9a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.config;
 
 import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
@@ -48,6 +49,7 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_MEMORYCOMPONENT_PAGESIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(128, KILOBYTE)),
         STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2),
         STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD(DOUBLE, 0.9d),
+        STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES(INTEGER, 0),
         STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE(LONG_BYTE_UNIT, 0L),
         STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
         STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
@@ -84,6 +86,9 @@ public class StorageProperties extends AbstractProperties {
                     return "The page size in bytes for pages allocated to memory components";
                 case STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS:
                     return "The number of memory components to be used per lsm index";
+                case STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES:
+                    return "The maximum number of concurrent flush operations. 0 means that the value will be "
+                            + "calculated as the number of partitions";
                 case STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD:
                     return "The memory usage threshold when memory components should be flushed";
                 case STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE:
@@ -165,6 +170,10 @@ public class StorageProperties extends AbstractProperties {
         return (int) (getBufferCacheSize() / (getBufferCachePageSize() + IBufferCache.RESERVED_HEADER_BYTES));
     }
 
+    public int getMaxConcurrentFlushes() {
+        return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES);
+    }
+
     public long getJobExecutionMemoryBudget() {
         final long jobExecutionMemory =
                 Runtime.getRuntime().maxMemory() - getBufferCacheSize() - getMemoryComponentGlobalBudget();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index 45594eb..d8c76ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -23,8 +23,10 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +36,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -66,9 +69,11 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
     private final Int2ObjectMap<AtomicInteger> fileIdUsageMap =
             Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
 
+    private final int maxConcurrentFlushes;
     private final List<ILSMIndex> primaryIndexes = new ArrayList<>();
+
+    private final Set<ILSMIndex> flushingIndexes = Collections.synchronizedSet(new HashSet<>());
     private volatile int flushPtr;
-    private volatile ILSMIndex flushingIndex;
 
     private final int filteredMemoryComponentMaxNumPages;
     private final int flushPageBudget;
@@ -76,7 +81,8 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
     private final AtomicBoolean isOpen = new AtomicBoolean(false);
     private final FlushThread flushThread = new FlushThread();
 
-    public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties) {
+    public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties,
+            int maxConcurrentFlushes) {
         this.vbc = new VirtualBufferCache(allocator, storageProperties.getBufferCachePageSize(),
                 (int) (storageProperties.getMemoryComponentGlobalBudget()
                         / storageProperties.getMemoryComponentPageSize()));
@@ -84,6 +90,7 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
                 / storageProperties.getMemoryComponentPageSize()
                 * storageProperties.getMemoryComponentFlushThreshold());
         this.filteredMemoryComponentMaxNumPages = storageProperties.getFilteredMemoryComponentMaxNumPages();
+        this.maxConcurrentFlushes = maxConcurrentFlushes;
     }
 
     @Override
@@ -97,24 +104,26 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
     }
 
     @Override
-    public synchronized void register(ILSMMemoryComponent memoryComponent) {
+    public void register(ILSMMemoryComponent memoryComponent) {
         ILSMIndex index = memoryComponent.getLsmIndex();
         if (index.isPrimaryIndex()) {
-            if (!primaryIndexes.contains(index)) {
-                // make sure only add index once
-                primaryIndexes.add(index);
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("Registered {} index {} to the global VBC",
-                            isMetadataIndex(index) ? "metadata" : "primary", index.toString());
+            synchronized (primaryIndexes) {
+                if (!primaryIndexes.contains(index)) {
+                    // make sure only add index once
+                    primaryIndexes.add(index);
+                    if (LOGGER.isInfoEnabled()) {
+                        LOGGER.info("Registered {} index {} to the global VBC",
+                                isMetadataIndex(index) ? "metadata" : "primary", index.toString());
+                    }
                 }
-            }
-            if (index.getNumOfFilterFields() > 0) {
-                // handle filtered primary index
-                AtomicInteger usage = new AtomicInteger();
-                memoryComponentUsageMap.put(memoryComponent, usage);
-                for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
-                    if (ref != null) {
-                        fileRefUsageMap.put(ref, usage);
+                if (index.getNumOfFilterFields() > 0) {
+                    // handle filtered primary index
+                    AtomicInteger usage = new AtomicInteger();
+                    memoryComponentUsageMap.put(memoryComponent, usage);
+                    for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+                        if (ref != null) {
+                            fileRefUsageMap.put(ref, usage);
+                        }
                     }
                 }
             }
@@ -122,29 +131,31 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
     }
 
     @Override
-    public synchronized void unregister(ILSMMemoryComponent memoryComponent) {
+    public void unregister(ILSMMemoryComponent memoryComponent) {
         ILSMIndex index = memoryComponent.getLsmIndex();
         if (index.isPrimaryIndex()) {
-            int pos = primaryIndexes.indexOf(index);
-            if (pos >= 0) {
-                primaryIndexes.remove(index);
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("Unregistered {} index {} to the global VBC",
-                            isMetadataIndex(index) ? "metadata" : "primary", index.toString());
-                }
-                if (primaryIndexes.isEmpty()) {
-                    flushPtr = 0;
-                } else if (flushPtr > pos) {
-                    // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that
-                    // it still points to the same index.
-                    flushPtr = (flushPtr - 1) % primaryIndexes.size();
+            synchronized (primaryIndexes) {
+                int pos = primaryIndexes.indexOf(index);
+                if (pos >= 0) {
+                    primaryIndexes.remove(index);
+                    if (LOGGER.isInfoEnabled()) {
+                        LOGGER.info("Unregistered {} index {} to the global VBC",
+                                isMetadataIndex(index) ? "metadata" : "primary", index.toString());
+                    }
+                    if (primaryIndexes.isEmpty()) {
+                        flushPtr = 0;
+                    } else if (flushPtr > pos) {
+                        // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that
+                        // it still points to the same index.
+                        flushPtr = (flushPtr - 1) % primaryIndexes.size();
+                    }
                 }
-            }
-            if (index.getNumOfFilterFields() > 0) {
-                memoryComponentUsageMap.remove(memoryComponent);
-                for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
-                    if (ref != null) {
-                        fileRefUsageMap.remove(ref);
+                if (index.getNumOfFilterFields() > 0) {
+                    memoryComponentUsageMap.remove(memoryComponent);
+                    for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+                        if (ref != null) {
+                            fileRefUsageMap.remove(ref);
+                        }
                     }
                 }
             }
@@ -153,26 +164,19 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
 
     @Override
     public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
-        if (memoryComponent.getLsmIndex() == flushingIndex) {
+        if (flushingIndexes.remove(memoryComponent.getLsmIndex())) {
+            LOGGER.info("Completed flushing {}.", memoryComponent.getIndex());
+            // After the flush operation is completed, we may have 2 cases:
+            // 1. there is no active reader on this memory component and memory is reclaimed;
+            // 2. there are still some active readers and memory cannot be reclaimed.
+            // But for both cases, we will notify all primary index op trackers to let their writers retry,
+            // if they have been blocked. Moreover, we will check whether more flushes are needed.
             synchronized (this) {
-                if (memoryComponent.getLsmIndex() == flushingIndex) {
-                    flushingIndex = null;
-                    // After the flush operation is completed, we may have 2 cases:
-                    // 1. there is no active reader on this memory component and memory is reclaimed;
-                    // 2. there are still some active readers and memory cannot be reclaimed.
-                    // But for both cases, we will notify all primary index op trackers to let their writers retry,
-                    // if they have been blocked. Moreover, we will check whether more flushes are needed.
-                    final int size = primaryIndexes.size();
-                    for (int i = 0; i < size; i++) {
-                        ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
-                        synchronized (opTracker) {
-                            opTracker.notifyAll();
-                        }
-                    }
-
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("Completed flushing {}. Resetting flushIndex back to null.",
-                                memoryComponent.getIndex().toString());
+                final int size = primaryIndexes.size();
+                for (int i = 0; i < size; i++) {
+                    ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
+                    synchronized (opTracker) {
+                        opTracker.notifyAll();
                     }
                 }
             }
@@ -200,7 +204,8 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
 
     @Override
     public boolean isFull(ILSMMemoryComponent memoryComponent) {
-        return memoryComponent.getLsmIndex() == flushingIndex || isFilteredMemoryComponentFull(memoryComponent);
+        return flushingIndexes.contains(memoryComponent.getLsmIndex())
+                || isFilteredMemoryComponentFull(memoryComponent);
     }
 
     private boolean isFilteredMemoryComponentFull(ILSMMemoryComponent memoryComponent) {
@@ -278,11 +283,7 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
     }
 
     private void checkAndNotifyFlushThread() {
-        if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
-            // For better performance, we only flush one dataset partition at a time.
-            // After reclaiming memory from this dataset partition, its memory can be used by other indexes.
-            // Thus, given N dataset partitions, each dataset partition will approximately receive 2/N of
-            // the total memory instead of 1/N, which doubles the memory utilization.
+        if (vbc.getUsage() < flushPageBudget) {
             return;
         }
         // Notify the flush thread to schedule flushes. This is used to avoid deadlocks because page pins can be
@@ -470,48 +471,63 @@ public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycle
         }
 
         private void scheduleFlush() throws HyracksDataException {
+            ILSMIndex selectedIndex = null;
             synchronized (GlobalVirtualBufferCache.this) {
-                int cycles = 0;
-                while (vbc.getUsage() >= flushPageBudget && flushingIndex == null && cycles <= primaryIndexes.size()) {
-                    // find the first modified memory component while avoiding infinite loops
-                    while (cycles <= primaryIndexes.size()
-                            && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
-                        flushPtr = (flushPtr + 1) % primaryIndexes.size();
-                        cycles++;
-                    }
+                while (flushingIndexes.size() < maxConcurrentFlushes
+                        && ((selectedIndex = selectFlushIndex()) != null)) {
+                    LOGGER.debug("Waiting for flushing primary index {} to complete...", selectedIndex);
+                    flushingIndexes.add(selectedIndex);
+                }
+            }
+        }
 
-                    ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
+        private ILSMIndex selectFlushIndex() throws HyracksDataException {
+            int cycles = 0;
+            while (vbc.getUsage() >= flushPageBudget && cycles <= primaryIndexes.size()) {
+                // find the first modified memory component while avoiding infinite loops
+                while (cycles <= primaryIndexes.size()
+                        && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
                     flushPtr = (flushPtr + 1) % primaryIndexes.size();
-                    // we need to manually flush this memory component because it may be idle at this point
-                    // note that this is different from flushing a filtered memory component
-                    PrimaryIndexOperationTracker opTracker =
-                            (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
-                    synchronized (opTracker) {
-                        boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
-                        if (flushable && !opTracker.isFlushLogCreated()) {
-                            // if the flush log has already been created, then we can simply wait for
-                            // that flush to complete
-                            opTracker.setFlushOnExit(true);
-                            opTracker.flushIfNeeded();
-                            // If the flush cannot be scheduled at this time, then there must be active writers.
-                            // The flush will be eventually scheduled when writers exit
-                            if (LOGGER.isInfoEnabled()) {
-                                LOGGER.info("Requested {} flushing primary index {}",
-                                        isMetadataIndex(primaryIndex) ? "metadata" : "primary",
-                                        primaryIndex.toString());
-                            }
+                    cycles++;
+                }
+
+                ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
+                flushPtr = (flushPtr + 1) % primaryIndexes.size();
+                // we need to manually flush this memory component because it may be idle at this point
+                // note that this is different from flushing a filtered memory component
+                PrimaryIndexOperationTracker opTracker =
+                        (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
+                synchronized (opTracker) {
+                    boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
+                    if (flushable && !opTracker.isFlushLogCreated()) {
+                        // if the flush log has already been created, then we can simply wait for
+                        // that flush to complete
+                        ILSMMemoryComponent memoryComponent = primaryIndex.getCurrentMemoryComponent();
+                        if (memoryComponent.getState() == ComponentState.READABLE_WRITABLE) {
+                            // before we schedule the flush, mark the memory component as unwritable to prevent
+                            // future writers
+                            memoryComponent.setUnwritable();
                         }
-                        if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
-                            // global vbc cannot wait on metadata indexes because metadata indexes support full
-                            // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
-                            flushingIndex = primaryIndex;
-                            LOGGER.debug("Waiting for flushing primary index {} to complete...", primaryIndex);
-                            break;
+
+                        opTracker.setFlushOnExit(true);
+                        opTracker.flushIfNeeded();
+                        // If the flush cannot be scheduled at this time, then there must be active writers.
+                        // The flush will be eventually scheduled when writers exit
+                        if (LOGGER.isInfoEnabled()) {
+                            LOGGER.info("Requested flushing {} index {}",
+                                    isMetadataIndex(primaryIndex) ? "metadata" : "primary", primaryIndex.toString());
                         }
                     }
+                    if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
+                        // global vbc cannot wait on metadata indexes because metadata indexes support full
+                        // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
+                        return primaryIndex;
+                    }
                 }
             }
+            return null;
         }
+
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index fef59e7..186cabb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -95,9 +95,11 @@ public interface ILSMComponent {
      *            whether the operation failed
      * @param isMutableComponent
      *            true if the thread intended to modify the component
+     * @return
+     *        true if extra cleanup is needed for the component
      * @throws HyracksDataException
      */
-    void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+    boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
             throws HyracksDataException;
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 23b3634..69b9547 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -128,6 +128,10 @@ public interface ILSMIndex extends IIndex {
 
     void addInactiveDiskComponent(ILSMDiskComponent diskComponent);
 
+    List<ILSMMemoryComponent> getInactiveMemoryComponents();
+
+    void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent);
+
     boolean isCurrentMutableComponentEmpty() throws HyracksDataException;
 
     void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index 064ab64..70b1bd3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -42,13 +42,20 @@ public interface ILSMMemoryComponent extends ILSMComponent {
     int getWriterCount();
 
     /**
-     * Clear the component and its metadata page completely
+     * Reset the memory component's state after the flush completes
      *
      * @throws HyracksDataException
      */
     void reset() throws HyracksDataException;
 
     /**
+     * Cleanup the memory component after flush (can be time consuming)
+     *
+     * @throws HyracksDataException
+     */
+    void cleanup() throws HyracksDataException;
+
+    /**
      * @return true if there are data in the memory component, false otherwise
      */
     boolean isModified();
@@ -105,6 +112,7 @@ public interface ILSMMemoryComponent extends ILSMComponent {
 
     /**
      * Called when the memory component is flushed to disk
+     *
      * @throws HyracksDataException
      */
     void flushed() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 0ad3029..66fafec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -92,7 +92,7 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl
     }
 
     @Override
-    public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+    public boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
             throws HyracksDataException {
         switch (opType) {
             case MERGE:
@@ -122,6 +122,7 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl
         if (readerCount <= -1) {
             throw new IllegalStateException("Invalid LSM disk component readerCount: " + readerCount);
         }
+        return false;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 6c7b1f8..a5ff2e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -94,6 +93,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     // components with lower indexes are newer than components with higher index
     protected final List<ILSMDiskComponent> diskComponents;
     protected final List<ILSMDiskComponent> inactiveDiskComponents;
+    protected final List<ILSMMemoryComponent> inactiveMemoryComponents;
     protected final double bloomFilterFalsePositiveRate;
     protected final IComponentFilterHelper filterHelper;
     protected final ILSMComponentFilterFrameFactory filterFrameFactory;
@@ -135,7 +135,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         this.filterManager = filterManager;
         this.treeFields = treeFields;
         this.filterFields = filterFields;
-        this.inactiveDiskComponents = new LinkedList<>();
+        this.inactiveDiskComponents = new ArrayList<>();
+        this.inactiveMemoryComponents = new ArrayList<>();
         this.durable = durable;
         this.tracer = tracer;
         lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(),
@@ -170,8 +171,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
         lsmHarness = new ExternalIndexHarness(this, ioScheduler, mergePolicy, opTracker,
                 diskBufferCache.isReplicationEnabled());
         isActive = false;
-        diskComponents = new LinkedList<>();
-        this.inactiveDiskComponents = new LinkedList<>();
+        diskComponents = new ArrayList<>();
+        this.inactiveDiskComponents = new ArrayList<>();
+        this.inactiveMemoryComponents = new ArrayList<>();
         // Memory related objects are nulled
         virtualBufferCaches = null;
         memoryComponents = null;
@@ -300,6 +302,7 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     private void resetMemoryComponents() throws HyracksDataException {
         if (memoryComponentsAllocated && memoryComponents != null) {
             for (ILSMMemoryComponent c : memoryComponents) {
+                c.cleanup();
                 c.reset();
             }
         }
@@ -700,6 +703,16 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     }
 
     @Override
+    public List<ILSMMemoryComponent> getInactiveMemoryComponents() {
+        return inactiveMemoryComponents;
+    }
+
+    @Override
+    public void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent) {
+        inactiveMemoryComponents.add(memoryComponent);
+    }
+
+    @Override
     public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents,
             ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
         //get set of files to be replicated for this component
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index a6c82bc..3b6667e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -151,8 +151,9 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     }
 
     @Override
-    public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+    public boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
             throws HyracksDataException {
+        boolean cleanup = false;
         switch (opType) {
             case FORCE_MODIFICATION:
             case MODIFICATION:
@@ -168,14 +169,14 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
                 } else {
                     readerCount--;
                     if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
-                        reset();
+                        cleanup = true;
                     }
                 }
                 break;
             case SEARCH:
                 readerCount--;
                 if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
-                    reset();
+                    cleanup = true;
                 }
                 break;
             case FLUSH:
@@ -183,25 +184,22 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
                     throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state);
                 }
                 readerCount--;
-                if (failedOperation) {
+                if (!failedOperation) {
                     // If flush failed, keep the component state to READABLE_UNWRITABLE_FLUSHING
-                    return;
-                }
-                // operation succeeded
-                if (readerCount == 0) {
-                    // TODO: move reset() outside of the synchronized block (on op tracker)
-                    reset();
-                } else {
+                    // operation succeeded
                     state = ComponentState.UNREADABLE_UNWRITABLE;
+                    if (readerCount == 0) {
+                        cleanup = true;
+                    }
                 }
                 break;
             default:
                 throw new UnsupportedOperationException("Unsupported operation " + opType);
         }
-
         if (readerCount <= -1 || writerCount <= -1) {
             throw new IllegalStateException("Invalid reader or writer count " + readerCount + " - " + writerCount);
         }
+        return cleanup;
     }
 
     @Override
@@ -235,7 +233,6 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
         if (filter != null) {
             filter.reset();
         }
-        doReset();
         lsmIndex.memoryComponentsReset();
         // a flush can be pending on a component that just completed its flush... here is when this can happen:
         // primary index has 2 components, secondary index has 2 components.
@@ -248,7 +245,8 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
         }
     }
 
-    protected void doReset() throws HyracksDataException {
+    @Override
+    public void cleanup() throws HyracksDataException {
         getIndex().deactivate();
         getIndex().destroy();
         getIndex().create();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java
index 373d7e7..882b4bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java
@@ -33,8 +33,8 @@ public abstract class AbstractLSMWithBuddyMemoryComponent extends AbstractLSMMem
     public abstract AbstractTreeIndex getBuddyIndex();
 
     @Override
-    public void doReset() throws HyracksDataException {
-        super.doReset();
+    public void cleanup() throws HyracksDataException {
+        super.cleanup();
         getBuddyIndex().deactivate();
         getBuddyIndex().destroy();
         getBuddyIndex().create();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 9a6fb43..3ea0f49 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -46,7 +46,7 @@ public class EmptyComponent implements ILSMDiskComponent {
     }
 
     @Override
-    public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+    public boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
             throws HyracksDataException {
         throw HyracksDataException.create(ErrorCode.ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 20a2555..92fa135 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
@@ -109,7 +108,7 @@ public class LSMHarness implements ILSMHarness {
                         return false;
                     }
                     try {
-                        opTracker.wait();
+                        opTracker.wait(100);
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
                         throw HyracksDataException.create(e);
@@ -176,8 +175,8 @@ public class LSMHarness implements ILSMHarness {
         if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) {
             return;
         }
-        List<ILSMDiskComponent> inactiveDiskComponents;
         List<ILSMDiskComponent> inactiveDiskComponentsToBeDeleted = null;
+        List<ILSMMemoryComponent> inactiveMemoryComponentsToBeCleanedUp = null;
         try {
             synchronized (opTracker) {
                 try {
@@ -220,12 +219,12 @@ public class LSMHarness implements ILSMHarness {
                      * and not anymore accessed.
                      * This cleanup is done outside of optracker synchronized block.
                      */
-                    inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
+                    List<ILSMDiskComponent> inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
                     if (!inactiveDiskComponents.isEmpty()) {
                         for (ILSMDiskComponent inactiveComp : inactiveDiskComponents) {
                             if (inactiveComp.getFileReferenceCount() == 1) {
                                 inactiveDiskComponentsToBeDeleted = inactiveDiskComponentsToBeDeleted == null
-                                        ? new LinkedList<>() : inactiveDiskComponentsToBeDeleted;
+                                        ? new ArrayList<>() : inactiveDiskComponentsToBeDeleted;
                                 inactiveDiskComponentsToBeDeleted.add(inactiveComp);
                             }
                         }
@@ -233,6 +232,11 @@ public class LSMHarness implements ILSMHarness {
                             inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
                         }
                     }
+                    List<ILSMMemoryComponent> inactiveMemoryComponents = lsmIndex.getInactiveMemoryComponents();
+                    if (!inactiveMemoryComponents.isEmpty()) {
+                        inactiveMemoryComponentsToBeCleanedUp = new ArrayList<>(inactiveMemoryComponents);
+                        inactiveMemoryComponents.clear();
+                    }
                 }
             }
         } finally {
@@ -256,6 +260,21 @@ public class LSMHarness implements ILSMHarness {
                     throw e; // NOSONAR: The last call in the finally clause
                 }
             }
+            if (inactiveMemoryComponentsToBeCleanedUp != null) {
+                for (ILSMMemoryComponent c : inactiveMemoryComponentsToBeCleanedUp) {
+                    tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex.toString());
+                    c.cleanup();
+                    synchronized (opTracker) {
+                        c.reset();
+                        // Notify all waiting threads whenever the mutable component's state
+                        // has changed to inactive. This is important because even though we switched
+                        // the mutable components, it is possible that the component that we just
+                        // switched to is still busy flushing its data to disk. Thus, the notification
+                        // that was issued upon scheduling the flush is not enough.
+                        opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
+                    }
+                }
+            }
             if (opType == LSMOperationType.FLUSH) {
                 ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
                 // We must call flushed without synchronizing on opTracker to avoid deadlocks
@@ -309,26 +328,16 @@ public class LSMHarness implements ILSMHarness {
         for (int i = 0; i < componentsCount; i++) {
             final ILSMComponent c = componentHolder.get(i);
             boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY;
-            c.threadExit(opType, failedOperation, isMutableComponent);
+            boolean needsCleanup = c.threadExit(opType, failedOperation, isMutableComponent);
             if (c.getType() == LSMComponentType.MEMORY) {
-                switch (c.getState()) {
-                    case READABLE_UNWRITABLE:
-                        if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
-                                || opType == LSMOperationType.FORCE_MODIFICATION)) {
-                            lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
-                        }
-                        break;
-                    case INACTIVE:
-                        tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex.toString());
-                        // Notify all waiting threads whenever the mutable component's state
-                        // has changed to inactive. This is important because even though we switched
-                        // the mutable components, it is possible that the component that we just
-                        // switched to is still busy flushing its data to disk. Thus, the notification
-                        // that was issued upon scheduling the flush is not enough.
-                        opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
-                        break;
-                    default:
-                        break;
+                if (c.getState() == ComponentState.READABLE_UNWRITABLE) {
+                    if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
+                            || opType == LSMOperationType.FORCE_MODIFICATION)) {
+                        lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
+                    }
+                }
+                if (needsCleanup) {
+                    lsmIndex.addInactiveMemoryComponent((ILSMMemoryComponent) c);
                 }
             } else if (c.getState() == ComponentState.INACTIVE) {
                 lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c);