You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2020/04/27 22:22:53 UTC

[asterixdb] branch master updated: [ASTERIXDB-2715][STO] Dynamic Memory Component Architecture

This is an automated email from the ASF dual-hosted git repository.

luochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bbf10b3  [ASTERIXDB-2715][STO] Dynamic Memory Component Architecture
bbf10b3 is described below

commit bbf10b30d6e1fe0a85df5b51b66a22de08ed66f2
Author: luochen <cl...@uci.edu>
AuthorDate: Sat Apr 25 09:08:57 2020 -0700

    [ASTERIXDB-2715][STO] Dynamic Memory Component Architecture
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Introduce a dynamic memory component architecture that uses a global
    virtual buffer cache to manage the write memory for all LSM-trees.
    - When the overall write memory is nearly full, we flush a dataset
    partition at a time using a round-robin way. Additionally, we allow
    users to configure the maximum size of filtered memory components
    to provide better pruning capability.
    - Clean up legacy code for statically allocating write memory to each
    dataset.
    - Remove the following parameters:
      storage.metadata.memorycomponent.numpages
      storage.max.active.writable.datasets
    - Add the following parameters:
      storage.memorycomponent.flush.threshold (default: 0.9)
      storage.filtered.memorycomponent.max.size (default 0)
    
    Change-Id: Ia6a0f4de020acd7af89ef630322526c4be5076e0
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5824
    Reviewed-by: Luo Chen <cl...@uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  24 +-
 asterixdb/asterix-app/src/main/resources/cc.conf   |   1 +
 .../asterix/app/bootstrap/TestNodeController.java  |   3 +-
 .../dataflow/GlobalVirtualBufferCacheTest.java     | 289 ++++++++++++
 .../test/dataflow/LSMFlushRecoveryTest.java        |   3 -
 .../test/dataflow/MultiPartitionLSMIndexTest.java  |  61 +--
 .../runtime/ClusterStateDefaultParameterTest.java  |  16 +-
 .../asterix/test/txn/RecoveryManagerTest.java      |   2 -
 .../test/resources/cc-small-txn-log-partition.conf |   3 +-
 .../api/cluster_state_1/cluster_state_1.1.regexadm |   1 -
 .../cluster_state_1_full.1.regexadm                |   1 -
 .../cluster_state_1_less.1.regexadm                |   1 -
 asterixdb/asterix-common/pom.xml                   |   4 +
 .../asterix/common/api/IDatasetMemoryManager.java  |  66 ---
 .../asterix/common/api/INcApplicationContext.java  |   5 +-
 .../asterix/common/config/StorageProperties.java   |  40 +-
 .../apache/asterix/common/context/DatasetInfo.java |  14 +-
 .../common/context/DatasetLifecycleManager.java    | 116 +----
 .../common/context/DatasetMemoryManager.java       | 154 -------
 .../asterix/common/context/DatasetResource.java    |   8 +-
 .../common/context/DatasetVirtualBufferCaches.java |  71 ---
 .../common/context/GlobalVirtualBufferCache.java   | 485 +++++++++++++++++++++
 .../test/context/DatasetMemoryManagerTest.java     | 125 ------
 .../metadata/bootstrap/MetadataBootstrap.java      |   4 -
 .../am/common/dataflow/IndexLifecycleManager.java  |  61 ---
 .../storage/am/common/impls/AbstractTreeIndex.java |   5 -
 .../lsm/btree/impls/LSMBTreeMemoryComponent.java   |   6 +
 .../impls/LSMBTreeWithBuddyMemoryComponent.java    |   5 +-
 .../am/lsm/common/api/ILSMMemoryComponent.java     |  24 +-
 .../am/lsm/common/api/IVirtualBufferCache.java     |  35 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java      |   9 -
 .../common/impls/AbstractLSMMemoryComponent.java   |  36 +-
 .../storage/am/lsm/common/impls/LSMHarness.java    |   5 +
 .../impls/MultitenantVirtualBufferCache.java       |  33 +-
 .../am/lsm/common/impls/VirtualBufferCache.java    |  62 +--
 .../impls/LSMInvertedIndexMemoryComponent.java     |   7 +
 .../inmemory/InMemoryInvertedIndex.java            |   6 -
 .../invertedindex/ondisk/OnDiskInvertedIndex.java  |   5 -
 .../lsm/rtree/impls/LSMRTreeMemoryComponent.java   |   6 +
 .../org/apache/hyracks/storage/common/IIndex.java  |   5 -
 .../storage/common/IResourceLifecycleManager.java  |   2 +-
 .../storage/common/IResourceMemoryManager.java     |  25 --
 .../buffercache/ResourceHeapBufferAllocator.java   |  55 ---
 .../btree/impl/IVirtualBufferCacheCallback.java    |   4 +-
 .../am/lsm/btree/impl/TestVirtualBufferCache.java  |  59 ++-
 45 files changed, 1088 insertions(+), 864 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 13ca95d..01fa365 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,6 @@ import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.api.IDatasetMemoryManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
@@ -50,7 +49,7 @@ import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
-import org.apache.asterix.common.context.DatasetMemoryManager;
+import org.apache.asterix.common.context.GlobalVirtualBufferCache;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
@@ -91,6 +90,7 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.ipc.impl.HyracksConnection;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
 import org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
@@ -130,9 +130,9 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     private final MessagingProperties messagingProperties;
     private final NodeProperties nodeProperties;
     private ExecutorService threadExecutor;
-    private IDatasetMemoryManager datasetMemoryManager;
     private IDatasetLifecycleManager datasetLifecycleManager;
     private IBufferCache bufferCache;
+    private IVirtualBufferCache virtualBufferCache;
     private ITransactionSubsystem txnSubsystem;
     private IMetadataNode metadataNodeStub;
     private ILSMIOOperationScheduler lsmIOScheduler;
@@ -205,10 +205,13 @@ public class NCAppRuntimeContext implements INcApplicationContext {
             }
             localResourceRepository.deleteStorageData();
         }
-        datasetMemoryManager = new DatasetMemoryManager(storageProperties);
+        virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties);
+        // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
+        // the metadata bootstrap task
+        ((ILifeCycleComponent) virtualBufferCache).start();
         datasetLifecycleManager =
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
-                        datasetMemoryManager, indexCheckpointManagerProvider, ioManager.getIODevices().size());
+                        virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
         final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
         final Set<Integer> nodePartitionsIds =
@@ -246,6 +249,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
          * managers. Notes: registered components are stopped in reversed order
          */
         ILifeCycleComponentManager lccm = getServiceContext().getLifeCycleComponentManager();
+        lccm.register((ILifeCycleComponent) virtualBufferCache);
         lccm.register((ILifeCycleComponent) bufferCache);
         /*
          * LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager
@@ -297,6 +301,11 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     }
 
     @Override
+    public IVirtualBufferCache getVirtualBufferCache() {
+        return virtualBufferCache;
+    }
+
+    @Override
     public ITransactionSubsystem getTransactionSubsystem() {
         return txnSubsystem;
     }
@@ -307,11 +316,6 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     }
 
     @Override
-    public IDatasetMemoryManager getDatasetMemoryManager() {
-        return datasetMemoryManager;
-    }
-
-    @Override
     public ILSMIOOperationScheduler getLSMIOScheduler() {
         return lsmIOScheduler;
     }
diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf
index 1b8e034..d85a5ef 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -39,6 +39,7 @@ storage.buffercache.pagesize=32KB
 storage.buffercache.size=128MB
 storage.memorycomponent.globalbudget=512MB
 storage.io.scheduler=greedy
+storage.filtered.memorycomponent.max.size=16MB
 
 [cc]
 address = 127.0.0.1
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 9f37c9b..ebf98df 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -691,7 +691,8 @@ public class TestNodeController {
             this.mergePolicyFactory = mergePolicyFactory;
             this.mergePolicyProperties = mergePolicyProperties;
             this.primaryKeyIndexes = primaryKeyIndexes;
-            primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+            primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1))
+                    + (filterFields != null ? filterFields.length : 0);
             primaryIndexTypeTraits =
                     createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
             primaryIndexSerdes =
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
new file mode 100644
index 0000000..0b07bc4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.StorageProperties.Option;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class GlobalVirtualBufferCacheTest {
+    public static final Logger LOGGER = LogManager.getLogger();
+    private static TestNodeController nc;
+    private static Dataset dataset;
+    private static Dataset filteredDataset;
+    private static PrimaryIndexInfo[] primaryIndexInfos;
+    private static PrimaryIndexInfo[] filteredPrimaryIndexInfos;
+    private static TestLsmBtree[] primaryIndexes;
+    private static TestLsmBtree[] filteredPrimaryIndexes;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+
+    private static IHyracksTaskContext[] testCtxs;
+    private static IHyracksTaskContext[] filteredTestCtxs;
+    private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
+    private static IIndexDataflowHelper[] filteredPrimaryIndexDataflowHelpers;
+    private static ITransactionContext txnCtx;
+    private static ITransactionContext filteredTxnCtx;
+    private static RecordTupleGenerator tupleGenerator;
+
+    private static final int NUM_PARTITIONS = 2;
+    private static final long FILTERED_MEMORY_COMPONENT_SIZE = 16 * 1024l;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "cc.conf";
+        nc = new TestNodeController(configPath, false);
+    }
+
+    @Before
+    public void initializeTest() throws Exception {
+        // initialize NC before each test
+        initializeNc();
+        initializeTestCtx();
+        createIndex();
+        readIndex();
+        tupleGenerator = StorageTestUtils.getTupleGenerator();
+    }
+
+    @After
+    public void deinitializeTest() throws Exception {
+        dropIndex();
+        // cleanup after each test case
+        nc.deInit(true);
+        nc.clearOpts();
+    }
+
+    @Test
+    public void testFlushes() throws Exception {
+        List<Thread> threads = new ArrayList<>();
+        int records = 16 * 1024;
+        int threadsPerPartition = 2;
+        AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+        for (int p = 0; p < NUM_PARTITIONS; p++) {
+            for (int t = 0; t < threadsPerPartition; t++) {
+                threads.add(insertRecords(records, p, false, exceptionRef));
+                threads.add(insertRecords(records, p, true, exceptionRef));
+            }
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        if (exceptionRef.get() != null) {
+            exceptionRef.get().printStackTrace();
+            Assert.fail();
+        }
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
+            Assert.assertTrue(
+                    primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
+                            .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
+
+            Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
+            Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
+                    .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
+                            .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
+        }
+
+        nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+    }
+
+    private void initializeNc() throws Exception {
+        List<Pair<IOption, Object>> opts = new ArrayList<>();
+        opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 128 * 1024L));
+        opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_PAGESIZE, 1 * 1024));
+        opts.add(Pair.of(Option.STORAGE_BUFFERCACHE_PAGESIZE, 1 * 1024));
+        opts.add(Pair.of(Option.STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE, FILTERED_MEMORY_COMPONENT_SIZE));
+        opts.add(Pair.of(Option.STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE, FILTERED_MEMORY_COMPONENT_SIZE));
+
+        nc.setOpts(opts);
+
+        nc.init(true);
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    private void createIndex() throws Exception {
+        dataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, "ds", StorageTestUtils.DATAVERSE_NAME,
+                StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        StorageTestUtils.PARTITIONING_KEYS, null, null, null, false, null),
+                null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID, 0);
+
+        filteredDataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, "filtered_ds",
+                StorageTestUtils.DATAVERSE_NAME, StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null,
+                new InternalDatasetDetails(null, PartitioningStrategy.HASH, StorageTestUtils.PARTITIONING_KEYS, null,
+                        null, null, false, Collections.singletonList("value")),
+                null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID + 1, 0);
+
+        primaryIndexInfos = new PrimaryIndexInfo[NUM_PARTITIONS];
+        filteredPrimaryIndexInfos = new PrimaryIndexInfo[NUM_PARTITIONS];
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            primaryIndexInfos[i] = StorageTestUtils.createPrimaryIndex(nc, dataset, i);
+            filteredPrimaryIndexInfos[i] = StorageTestUtils.createPrimaryIndex(nc, filteredDataset, i);
+        }
+    }
+
+    private void initializeTestCtx() throws Exception {
+        JobId jobId = nc.newJobId();
+        JobId filteredJobId = nc.newJobId();
+
+        testCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
+        filteredTestCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
+
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            testCtxs[i] = nc.createTestContext(jobId, i, false);
+            filteredTestCtxs[i] = nc.createTestContext(filteredJobId, i, false);
+        }
+        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        filteredTxnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(filteredJobId),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+    }
+
+    private void readIndex() throws HyracksDataException {
+        primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        primaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
+
+        filteredPrimaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        filteredPrimaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
+
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            IIndexDataflowHelperFactory factory =
+                    new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfos[i].getFileSplitProvider());
+            primaryIndexDataflowHelpers[i] = factory.create(testCtxs[i].getJobletContext().getServiceContext(), i);
+            primaryIndexDataflowHelpers[i].open();
+            primaryIndexes[i] = (TestLsmBtree) primaryIndexDataflowHelpers[i].getIndexInstance();
+            primaryIndexDataflowHelpers[i].close();
+
+            IIndexDataflowHelperFactory filteredFactory = new IndexDataflowHelperFactory(nc.getStorageManager(),
+                    filteredPrimaryIndexInfos[i].getFileSplitProvider());
+            filteredPrimaryIndexDataflowHelpers[i] =
+                    filteredFactory.create(filteredTestCtxs[i].getJobletContext().getServiceContext(), i);
+            filteredPrimaryIndexDataflowHelpers[i].open();
+            filteredPrimaryIndexes[i] = (TestLsmBtree) filteredPrimaryIndexDataflowHelpers[i].getIndexInstance();
+            filteredPrimaryIndexDataflowHelpers[i].close();
+        }
+    }
+
+    private void dropIndex() throws HyracksDataException {
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            primaryIndexDataflowHelpers[i].destroy();
+            filteredPrimaryIndexDataflowHelpers[i].destroy();
+        }
+    }
+
+    private Thread insertRecords(int records, int partition, boolean filtered, AtomicReference<Exception> exceptionRef)
+            throws Exception {
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LSMPrimaryInsertOperatorNodePushable insertOp = filtered ? nc
+                            .getInsertPipeline(filteredTestCtxs[partition], filteredDataset, StorageTestUtils.KEY_TYPES,
+                                    StorageTestUtils.RECORD_TYPE, StorageTestUtils.META_TYPE,
+                                    filteredPrimaryIndexes[partition].getFilterFields(), StorageTestUtils.KEY_INDEXES,
+                                    StorageTestUtils.KEY_INDICATORS_LIST, StorageTestUtils.STORAGE_MANAGER, null, null)
+                            .getLeft()
+                            : nc.getInsertPipeline(testCtxs[partition], dataset, StorageTestUtils.KEY_TYPES,
+                                    StorageTestUtils.RECORD_TYPE, StorageTestUtils.META_TYPE, null,
+                                    StorageTestUtils.KEY_INDEXES, StorageTestUtils.KEY_INDICATORS_LIST,
+                                    StorageTestUtils.STORAGE_MANAGER, null, null).getLeft();
+                    insertOp.open();
+                    FrameTupleAppender tupleAppender =
+                            new FrameTupleAppender(new FixedSizeFrame(ByteBuffer.allocate(512)));
+
+                    ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(filtered ? 3 : 2);
+                    ArrayTupleReference tupleRef = new ArrayTupleReference();
+                    for (int i = 0; i < records; i++) {
+                        synchronized (tupleGenerator) {
+                            ITupleReference tuple = tupleGenerator.next();
+                            TupleUtils.copyTuple(tupleBuilder, tuple, 2);
+                            if (filtered) {
+                                // append the filter field
+                                tupleBuilder.getDataOutput().writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+                                tupleBuilder.getDataOutput().writeLong(0l);
+                                tupleBuilder.addFieldEndOffset();
+                            }
+                            tupleRef.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+                        }
+                        DataflowUtils.addTupleToFrame(tupleAppender, tupleRef, insertOp);
+                    }
+                    tupleAppender.write(insertOp, true);
+                    insertOp.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptionRef.compareAndSet(null, e);
+                }
+            }
+        });
+        thread.start();
+        return thread;
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index 03ca1f0..fe73baf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -132,9 +132,6 @@ public class LSMFlushRecoveryTest {
         List<Pair<IOption, Object>> opts = new ArrayList<>();
         opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 20 * 1024 * 1024L));
         opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_PAGESIZE, 1 * 1024));
-        // each memory component only gets 4 pages (we have 2 partitions, 2 memory components/partition)
-        // and some reserved memory for metadata dataset
-        opts.add(Pair.of(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, 1024));
         nc.setOpts(opts);
         initializeNc(false);
         initializeTestCtx();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 1e1df54..d43b6d5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -35,6 +35,7 @@ import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.StorageProperties.Option;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -51,7 +52,9 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
@@ -132,6 +135,9 @@ public class MultiPartitionLSMIndexTest {
         String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
                 + File.separator + "resources" + File.separator + "cc-multipart.conf";
         nc = new TestNodeController(configPath, false);
+        List<Pair<IOption, Object>> opts = new ArrayList<>();
+        opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 16 * 1024 * 1024L));
+        nc.setOpts(opts);
         nc.init();
         ncAppCtx = nc.getAppRuntimeContext();
         dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
@@ -262,20 +268,23 @@ public class MultiPartitionLSMIndexTest {
             MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
             primaryLsmBtrees[0].addVirtuablBufferCacheCallback(new IVirtualBufferCacheCallback() {
                 @Override
-                public void isFullChanged(boolean newValue) {
-                    synchronized (isFull) {
-                        isFull.set(newValue);
-                        isFull.notifyAll();
-                    }
-                    synchronized (proceedToScheduleFlush) {
-                        while (!proceedToScheduleFlush.booleanValue()) {
-                            try {
-                                proceedToScheduleFlush.wait();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                return;
+                public void isFullChanged(boolean newValue, ILSMMemoryComponent memoryComponent) {
+                    if (memoryComponent != null && memoryComponent.getLsmIndex() == primaryLsmBtrees[0]) {
+                        synchronized (isFull) {
+                            isFull.set(newValue);
+                            isFull.notifyAll();
+                        }
+                        synchronized (proceedToScheduleFlush) {
+                            while (!proceedToScheduleFlush.booleanValue()) {
+                                try {
+                                    proceedToScheduleFlush.wait();
+                                } catch (InterruptedException e) {
+                                    Thread.currentThread().interrupt();
+                                    return;
+                                }
                             }
                         }
+                        System.out.println("Proceed to flush");
                     }
                 }
             });
@@ -350,7 +359,7 @@ public class MultiPartitionLSMIndexTest {
             // Now we need to know that the flush has been scheduled
             synchronized (flushStarted) {
                 while (!flushStarted.booleanValue()) {
-                    flushStarted.wait();
+                    flushStarted.wait(100);
                 }
             }
 
@@ -433,18 +442,20 @@ public class MultiPartitionLSMIndexTest {
             MutableBoolean proceedAfterIsFullChanged = new MutableBoolean(false);
             primaryLsmBtrees[1].addVirtuablBufferCacheCallback(new IVirtualBufferCacheCallback() {
                 @Override
-                public void isFullChanged(boolean newValue) {
-                    synchronized (isFull) {
-                        isFull.set(newValue);
-                        isFull.notifyAll();
-                    }
-                    synchronized (proceedAfterIsFullChanged) {
-                        while (!proceedAfterIsFullChanged.booleanValue()) {
-                            try {
-                                proceedAfterIsFullChanged.wait();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                return;
+                public void isFullChanged(boolean newValue, ILSMMemoryComponent memoryComponent) {
+                    if (memoryComponent != null && memoryComponent.getLsmIndex() == primaryLsmBtrees[1]) {
+                        synchronized (isFull) {
+                            isFull.set(newValue);
+                            isFull.notifyAll();
+                        }
+                        synchronized (proceedAfterIsFullChanged) {
+                            while (!proceedAfterIsFullChanged.booleanValue()) {
+                                try {
+                                    proceedAfterIsFullChanged.wait();
+                                } catch (InterruptedException e) {
+                                    Thread.currentThread().interrupt();
+                                    return;
+                                }
                             }
                         }
                     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
index e752493..3d5dad8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
@@ -87,12 +87,16 @@ public class ClusterStateDefaultParameterTest {
                 Assert.assertTrue(getValue(row) == maxHeap / 4);
                 matchCount++;
             }
-            if (row.contains("storage.max.active.writable.datasets")) {
-                Assert.assertTrue(getValue(row) == 8);
+            if (row.contains("storage.memorycomponent.flush.threshold")) {
+                Assert.assertTrue(getDoubleValue(row) == 0.9d);
+                matchCount++;
+            }
+            if (row.contains("storage.filtered.memorycomponent.max.size")) {
+                Assert.assertTrue(getValue(row) == 0);
                 matchCount++;
             }
         }
-        Assert.assertTrue(matchCount == 3);
+        Assert.assertTrue(matchCount == 4);
     }
 
     // Parses a long value parameter.
@@ -100,4 +104,10 @@ public class ClusterStateDefaultParameterTest {
         String valueStr = row.split(":")[1].trim();
         return Long.parseLong(valueStr);
     }
+
+    // Parses a long value parameter.
+    private double getDoubleValue(String row) {
+        String valueStr = row.split(":")[1].trim();
+        return Double.parseDouble(valueStr);
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
index 65b10a0..59d7fae 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -22,7 +22,6 @@ import java.io.File;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.common.TestDataUtil;
-import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
 import org.junit.After;
 import org.junit.Assert;
@@ -42,7 +41,6 @@ public class RecoveryManagerTest {
 
     @Before
     public void setUp() throws Exception {
-        integrationUtil.addOption(StorageProperties.Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, 20);
         integrationUtil.setGracefulShutdown(false);
         integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
     }
diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
index b456155..562dedd 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
@@ -55,5 +55,4 @@ messaging.frame.count=512
 txn.log.partitionsize=2MB
 txn.log.buffer.pagesize=128KB
 txn.log.checkpoint.pollfrequency=2147483647
-txn.log.checkpoint.history=0
-storage.max.active.writable.datasets=50
\ No newline at end of file
+txn.log.checkpoint.history=0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index acfc5e0..50a901f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -37,7 +37,6 @@
     "replication\.strategy" : "none",
     "replication\.timeout" : 30,
     "ssl\.enabled" : false,
-    "storage.max.active.writable.datasets" : 8,
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
     "txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 6d28223..318206d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -37,7 +37,6 @@
     "replication\.strategy" : "none",
     "replication\.timeout" : 30,
     "ssl\.enabled" : false,
-    "storage.max.active.writable.datasets" : 8,
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
     "txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 0724b25..69e9b0e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -37,7 +37,6 @@
     "replication\.strategy" : "none",
     "replication\.timeout" : 30,
     "ssl\.enabled" : false,
-    "storage.max.active.writable.datasets" : 8,
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
     "txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index 2d354a7..ae2e77a 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -295,5 +295,9 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-control-nc</artifactId>
     </dependency>
+     <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
deleted file mode 100644
index fde2c80..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.api;
-
-public interface IDatasetMemoryManager {
-
-    /**
-     * Allocates memory for dataset {@code datasetId}.
-     *
-     * @param datasetId
-     * @return true, if the allocation is successful, otherwise false.
-     */
-    boolean allocate(int datasetId);
-
-    /**
-     * Deallocates memory of dataset {@code datasetId}.
-     *
-     * @param datasetId
-     */
-    void deallocate(int datasetId);
-
-    /**
-     * Reserves memory for dataset {@code datasetId}. The reserved memory
-     * is guaranteed to be allocatable when needed for the dataset. Reserve
-     * maybe called after allocation to reserve the allocated budget
-     * on deallocation.
-     *
-     * @param datasetId
-     * @return true, if the allocation is successful, otherwise false.
-     */
-    boolean reserve(int datasetId);
-
-    /**
-     * Cancels the reserved memory for dataset {@code datasetId}.
-     *
-     * @param datasetId
-     */
-    void cancelReserved(int datasetId);
-
-    /**
-     * @return The remaining memory budget that can be used for datasets.
-     */
-    long getAvailable();
-
-    /**
-     * @param datasetId
-     * @return The number of virtual buffer cache pages that should be allocated for dataset {@code datasetId}.
-     */
-    int getNumPages(int datasetId);
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 79828e1..eed186c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
@@ -60,12 +61,12 @@ public interface INcApplicationContext extends IApplicationContext {
 
     IBufferCache getBufferCache();
 
+    IVirtualBufferCache getVirtualBufferCache();
+
     ILocalResourceRepository getLocalResourceRepository();
 
     IDatasetLifecycleManager getDatasetLifecycleManager();
 
-    IDatasetMemoryManager getDatasetMemoryManager();
-
     IResourceIdFactory getResourceIdFactory();
 
     void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 58bc828..958d5ae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -47,9 +47,9 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_MEMORYCOMPONENT_GLOBALBUDGET(LONG_BYTE_UNIT, Runtime.getRuntime().maxMemory() / 4),
         STORAGE_MEMORYCOMPONENT_PAGESIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(128, KILOBYTE)),
         STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2),
-        STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES(POSITIVE_INTEGER, 8),
+        STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD(DOUBLE, 0.9d),
+        STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE(LONG_BYTE_UNIT, 0L),
         STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
-        STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8),
         STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
         STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
         STORAGE_IO_SCHEDULER(STRING, "greedy");
@@ -64,9 +64,6 @@ public class StorageProperties extends AbstractProperties {
 
         @Override
         public Section section() {
-            if (this == STORAGE_MAX_ACTIVE_WRITABLE_DATASETS) {
-                return Section.COMMON;
-            }
             return Section.NC;
         }
 
@@ -87,12 +84,13 @@ public class StorageProperties extends AbstractProperties {
                     return "The page size in bytes for pages allocated to memory components";
                 case STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS:
                     return "The number of memory components to be used per lsm index";
-                case STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES:
-                    return "The number of pages to allocate for a metadata memory component";
+                case STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD:
+                    return "The memory usage threshold when memory components should be flushed";
+                case STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE:
+                    return "The maximum size of a filtered memory component. 0 means that the memory component "
+                            + "does not have a maximum size";
                 case STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE:
                     return "The maximum acceptable false positive rate for bloom filters associated with LSM indexes";
-                case STORAGE_MAX_ACTIVE_WRITABLE_DATASETS:
-                    return "The maximum number of datasets that can be concurrently modified";
                 case STORAGE_COMPRESSION_BLOCK:
                     return "The default compression scheme for the storage";
                 case STORAGE_DISK_FORCE_BYTES:
@@ -116,9 +114,6 @@ public class StorageProperties extends AbstractProperties {
 
         @Override
         public String usageDefaultOverride(IApplicationConfig accessor, Function<IOption, String> optionPrinter) {
-            if (this == STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES) {
-                return "8 pages";
-            }
             return null;
         }
     }
@@ -145,16 +140,13 @@ public class StorageProperties extends AbstractProperties {
         return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_PAGESIZE);
     }
 
-    public int getMemoryComponentNumPages() {
-        final long metadataReservedMem = getMetadataReservedMemory();
-        final long globalUserDatasetMem = getMemoryComponentGlobalBudget() - metadataReservedMem;
-        final long userDatasetMem =
-                globalUserDatasetMem / (getMaxActiveWritableDatasets() + geSystemReservedDatasets());
-        return (int) (userDatasetMem / getMemoryComponentPageSize());
+    public double getMemoryComponentFlushThreshold() {
+        return accessor.getDouble(Option.STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD);
     }
 
-    public int getMetadataMemoryComponentNumPages() {
-        return accessor.getInt(Option.STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES);
+    public int getFilteredMemoryComponentMaxNumPages() {
+        return (int) (accessor.getLong(Option.STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE)
+                / getMemoryComponentPageSize());
     }
 
     public int getMemoryComponentsNum() {
@@ -186,10 +178,6 @@ public class StorageProperties extends AbstractProperties {
         return jobExecutionMemory;
     }
 
-    public int getMaxActiveWritableDatasets() {
-        return accessor.getInt(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS);
-    }
-
     public String getCompressionScheme() {
         return accessor.getString(Option.STORAGE_COMPRESSION_BLOCK);
     }
@@ -206,10 +194,6 @@ public class StorageProperties extends AbstractProperties {
         return SYSTEM_RESERVED_DATASETS;
     }
 
-    private long getMetadataReservedMemory() {
-        return (getMetadataMemoryComponentNumPages() * (long) getMemoryComponentPageSize()) * getMetadataDatasets();
-    }
-
     public int getDiskForcePages() {
         return (int) (accessor.getLong(Option.STORAGE_DISK_FORCE_BYTES) / getBufferCachePageSize());
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 4ccb0cc..3fcc528 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -45,7 +45,6 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private long lastAccess;
     private boolean isExternal;
     private boolean isRegistered;
-    private boolean memoryAllocated;
     private boolean durable;
 
     public DatasetInfo(int datasetID, ILogManager logManager) {
@@ -54,7 +53,6 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
-        this.setMemoryAllocated(false);
         this.logManager = logManager;
         waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
@@ -144,8 +142,8 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     @Override
     public String toString() {
         return "DatasetID: " + getDatasetID() + ", isOpen: " + isOpen() + ", refCount: " + getReferenceCount()
-                + ", lastAccess: " + getLastAccess() + ", isRegistered: " + isRegistered() + ", memoryAllocated: "
-                + isMemoryAllocated() + ", isDurable: " + isDurable();
+                + ", lastAccess: " + getLastAccess() + ", isRegistered: " + isRegistered() + ", isDurable: "
+                + isDurable();
     }
 
     public boolean isDurable() {
@@ -192,14 +190,6 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
         return datasetID;
     }
 
-    public boolean isMemoryAllocated() {
-        return memoryAllocated;
-    }
-
-    public void setMemoryAllocated(boolean memoryAllocated) {
-        this.memoryAllocated = memoryAllocated;
-    }
-
     public long getLastAccess() {
         return lastAccess;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index d396d9b..0750749 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -18,25 +18,21 @@
  */
 package org.apache.asterix.common.context;
 
-import static org.apache.asterix.common.metadata.MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS;
 import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.api.IDatasetMemoryManager;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -67,22 +63,27 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
     private final StorageProperties storageProperties;
     private final ILocalResourceRepository resourceRepository;
-    private final IDatasetMemoryManager memoryManager;
+    private final IVirtualBufferCache vbc;
     private final ILogManager logManager;
     private final LogRecord waitLog;
-    private final int numPartitions;
     private volatile boolean stopped = false;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+    // all LSM-trees share the same virtual buffer cache list
+    private final List<IVirtualBufferCache> vbcs;
 
     public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
-            ILogManager logManager, IDatasetMemoryManager memoryManager,
+            ILogManager logManager, IVirtualBufferCache vbc,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider, int numPartitions) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
-        this.memoryManager = memoryManager;
+        this.vbc = vbc;
+        int numMemoryComponents = storageProperties.getMemoryComponentsNum();
+        this.vbcs = new ArrayList<>(numMemoryComponents);
+        for (int i = 0; i < numMemoryComponents; i++) {
+            vbcs.add(vbc);
+        }
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
-        this.numPartitions = numPartitions;
         waitLog = new LogRecord();
         waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
@@ -204,38 +205,6 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         iInfo.touch();
     }
 
-    private boolean evictCandidateDataset() throws HyracksDataException {
-        /**
-         * We will take a dataset that has no active transactions, it is open (a dataset consuming memory),
-         * that is not being used (refcount == 0) and has been least recently used, excluding metadata datasets.
-         * The sort order defined for DatasetInfo maintains this. See DatasetInfo.compareTo().
-         */
-        List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
-        Collections.sort(datasetsResources);
-        for (DatasetResource dsr : datasetsResources) {
-            if (isCandidateDatasetForEviction(dsr)) {
-                closeDataset(dsr);
-                LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private boolean isCandidateDatasetForEviction(DatasetResource dsr) {
-        for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
-            if (opTracker.getNumActiveOperations() != 0) {
-                return false;
-            }
-        }
-        if (dsr.getDatasetInfo().getReferenceCount() != 0 || !dsr.getDatasetInfo().isOpen()
-                || dsr.isMetadataDataset()) {
-            return false;
-        }
-
-        return true;
-    }
-
     public DatasetResource getDatasetLifecycle(int did) {
         DatasetResource dsr = datasets.get(did);
         if (dsr != null) {
@@ -245,11 +214,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             dsr = datasets.get(did);
             if (dsr == null) {
                 DatasetInfo dsInfo = new DatasetInfo(did, logManager);
-                int partitions = MetadataIndexImmutableProperties.isMetadataDataset(did) ? METADATA_DATASETS_PARTITIONS
-                        : numPartitions;
-                DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
-                        memoryManager.getNumPages(did), partitions);
-                dsr = new DatasetResource(dsInfo, vbcs);
+                dsr = new DatasetResource(dsInfo);
                 datasets.put(did, dsr);
             }
             return dsr;
@@ -312,24 +277,18 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         return openIndexesInfo;
     }
 
-    private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
-        return getDatasetLifecycle(datasetID).getVirtualBufferCaches();
-    }
-
     @Override
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum) {
-        DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
-        return dvbcs.getVirtualBufferCaches(this, ioDeviceNum);
+        return vbcs;
     }
 
     private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
-        deallocateDatasetMemory(datasetID);
         datasets.remove(datasetID);
     }
 
     @Override
     public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
-        DatasetResource dataset = datasets.get(datasetId);
+        DatasetResource dataset = getDatasetLifecycle(datasetId);
         PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
         if (opTracker == null) {
             populateOpTrackerAndIdGenerator(dataset, partition, path);
@@ -513,7 +472,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         StringBuilder sb = new StringBuilder();
 
         sb.append(String.format("Memory budget = %d%n", storageProperties.getMemoryComponentGlobalBudget()));
-        sb.append(String.format("Memory available = %d%n", memoryManager.getAvailable()));
+        long avaialbleMemory = storageProperties.getMemoryComponentGlobalBudget()
+                - (long) vbc.getUsage() * storageProperties.getMemoryComponentPageSize();
+        sb.append(String.format("Memory available = %d%n", avaialbleMemory));
         sb.append("\n");
 
         String dsHeaderFormat = "%-10s %-6s %-16s %-12s\n";
@@ -540,51 +501,6 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         outputStream.write(sb.toString().getBytes());
     }
 
-    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
-        DatasetResource dsr = datasets.get(datasetId);
-        if (dsr == null) {
-            throw new HyracksDataException(
-                    "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
-        }
-        DatasetInfo dsInfo = dsr.getDatasetInfo();
-        if (dsInfo == null) {
-            throw new HyracksDataException(
-                    "Failed to deallocate memory for dataset with ID " + datasetId + " since it is not open.");
-        }
-        synchronized (dsInfo) {
-            if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) {
-                memoryManager.deallocate(datasetId);
-                dsInfo.setMemoryAllocated(false);
-            }
-        }
-    }
-
-    @Override
-    public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
-        //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
-        int datasetId = Integer.parseInt(resourcePath);
-        DatasetResource dsr = datasets.get(datasetId);
-        if (dsr == null) {
-            throw new HyracksDataException(
-                    "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
-        }
-        DatasetInfo dsInfo = dsr.getDatasetInfo();
-        synchronized (dsInfo) {
-            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
-            if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) {
-                while (!memoryManager.allocate(datasetId)) {
-                    if (!evictCandidateDataset()) {
-                        LOGGER.warn("failed to allocate memory for dataset {}. Currently allocated {}",
-                                dsInfo::getDatasetID, ((DatasetMemoryManager) memoryManager)::getState);
-                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
-                                + " memory since memory budget would be exceeded.");
-                    }
-                }
-                dsInfo.setMemoryAllocated(true);
-            }
-        }
-    }
-
     @Override
     public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
deleted file mode 100644
index ded6fb9..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.common.api.IDatasetMemoryManager;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.annotations.ThreadSafe;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-@ThreadSafe
-public class DatasetMemoryManager implements IDatasetMemoryManager {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final Map<Integer, Long> allocatedMap = new HashMap<>();
-    private final Map<Integer, Long> reservedMap = new HashMap<>();
-    private long available;
-    private final StorageProperties storageProperties;
-
-    public DatasetMemoryManager(StorageProperties storageProperties) {
-        this.storageProperties = storageProperties;
-        available = storageProperties.getMemoryComponentGlobalBudget();
-    }
-
-    @Override
-    public synchronized boolean allocate(int datasetId) {
-        if (allocatedMap.containsKey(datasetId)) {
-            throw new IllegalStateException("Memory is already allocated for dataset: " + datasetId);
-        }
-        if (reservedMap.containsKey(datasetId)) {
-            allocateReserved(datasetId);
-            return true;
-        }
-        final long required = getTotalSize(datasetId);
-        if (!isAllocatable(required)) {
-            return false;
-        }
-        allocatedMap.put(datasetId, required);
-        available -= required;
-        LOGGER.info(() -> "Allocated(" + required + ") for dataset(" + datasetId + ")");
-        return true;
-    }
-
-    @Override
-    public synchronized void deallocate(int datasetId) {
-        if (!allocatedMap.containsKey(datasetId) && !reservedMap.containsKey(datasetId)) {
-            throw new IllegalStateException("No allocated or reserved memory for dataset: " + datasetId);
-        }
-        final Long allocated = allocatedMap.remove(datasetId);
-        // return the allocated budget if it is not reserved
-        if (allocated != null && !reservedMap.containsKey(datasetId)) {
-            available += allocated;
-            LOGGER.info(() -> "Deallocated(" + allocated + ") from dataset(" + datasetId + ")");
-        }
-    }
-
-    @Override
-    public synchronized boolean reserve(int datasetId) {
-        if (reservedMap.containsKey(datasetId)) {
-            LOGGER.info("Memory is already reserved for dataset: {}", () -> datasetId);
-            return true;
-        }
-        final long required = getTotalSize(datasetId);
-        if (!isAllocatable(required) && !allocatedMap.containsKey(datasetId)) {
-            return false;
-        }
-        reservedMap.put(datasetId, required);
-        // if the budget is already allocated, no need to reserve it again
-        if (!allocatedMap.containsKey(datasetId)) {
-            available -= required;
-        }
-        LOGGER.info(() -> "Reserved(" + required + ") for dataset(" + datasetId + ")");
-        return true;
-    }
-
-    @Override
-    public synchronized void cancelReserved(int datasetId) {
-        final Long reserved = reservedMap.remove(datasetId);
-        if (reserved == null) {
-            throw new IllegalStateException("No reserved memory for dataset: " + datasetId);
-        }
-        available += reserved;
-        LOGGER.info(() -> "Cancelled reserved(" + reserved + ") from dataset(" + datasetId + ")");
-    }
-
-    @Override
-    public long getAvailable() {
-        return available;
-    }
-
-    @Override
-    public int getNumPages(int datasetId) {
-        return MetadataIndexImmutableProperties.isMetadataDataset(datasetId)
-                ? storageProperties.getMetadataMemoryComponentNumPages()
-                : storageProperties.getMemoryComponentNumPages();
-    }
-
-    public JsonNode getState() {
-        final ObjectNode state = JSONUtil.createObject();
-        state.put("availableBudget", available);
-        state.set("allocated", budgetMapToJsonArray(allocatedMap));
-        state.set("reserved", budgetMapToJsonArray(reservedMap));
-        return state;
-    }
-
-    private long getTotalSize(int datasetId) {
-        return storageProperties.getMemoryComponentPageSize() * (long) getNumPages(datasetId);
-    }
-
-    private boolean isAllocatable(long required) {
-        return available - required >= 0;
-    }
-
-    private void allocateReserved(int datasetId) {
-        final Long reserved = reservedMap.get(datasetId);
-        allocatedMap.put(datasetId, reserved);
-    }
-
-    private static ArrayNode budgetMapToJsonArray(Map<Integer, Long> memorytMap) {
-        final ArrayNode array = JSONUtil.createArray();
-        memorytMap.forEach((k, v) -> {
-            final ObjectNode dataset = JSONUtil.createObject();
-            dataset.put("datasetId", k);
-            dataset.put("budget", v);
-            array.add(dataset);
-        });
-        return array;
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 8dcae23..8844d41 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -43,14 +43,12 @@ import org.apache.hyracks.storage.common.LocalResource;
  */
 public class DatasetResource implements Comparable<DatasetResource> {
     private final DatasetInfo datasetInfo;
-    private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
 
     private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
     private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
 
-    public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
+    public DatasetResource(DatasetInfo datasetInfo) {
         this.datasetInfo = datasetInfo;
-        this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
         this.datasetPrimaryOpTrackers = new HashMap<>();
         this.datasetComponentIdGenerators = new HashMap<>();
     }
@@ -83,10 +81,6 @@ public class DatasetResource implements Comparable<DatasetResource> {
         datasetInfo.untouch();
     }
 
-    public DatasetVirtualBufferCaches getVirtualBufferCaches() {
-        return datasetVirtualBufferCaches;
-    }
-
     public ILSMIndex getIndex(long resourceID) {
         IndexInfo iInfo = getIndexInfo(resourceID);
         return (iInfo == null) ? null : iInfo.getIndex();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
deleted file mode 100644
index c9b9698..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
-import org.apache.hyracks.storage.common.IResourceMemoryManager;
-import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
-
-public class DatasetVirtualBufferCaches {
-    private final int datasetID;
-    private final StorageProperties storageProperties;
-    private final int numPartitions;
-    private final int numPages;
-    private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
-
-    public DatasetVirtualBufferCaches(int datasetID, StorageProperties storageProperties, int numPages,
-            int numPartitions) {
-        this.datasetID = datasetID;
-        this.storageProperties = storageProperties;
-        this.numPartitions = numPartitions;
-        this.numPages = numPages;
-    }
-
-    public List<IVirtualBufferCache> getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) {
-        synchronized (ioDeviceVirtualBufferCaches) {
-            List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
-            if (vbcs == null) {
-                vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum, numPages);
-            }
-            return vbcs;
-        }
-    }
-
-    private List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
-            int ioDeviceNum, int numPages) {
-        List<IVirtualBufferCache> vbcs = new ArrayList<>();
-        for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
-            MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
-                    new VirtualBufferCache(new ResourceHeapBufferAllocator(memoryManager, Integer.toString(datasetID)),
-                            storageProperties.getMemoryComponentPageSize(),
-                            numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
-            vbcs.add(vbc);
-        }
-        ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
-        return vbcs;
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
new file mode 100644
index 0000000..6e97d64
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
+import org.apache.hyracks.storage.common.buffercache.VirtualPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycleComponent {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    // keep track of the memory usage of each filtered memory component
+    private final Map<ILSMMemoryComponent, AtomicInteger> memoryComponentUsageMap =
+            Collections.synchronizedMap(new HashMap<>());
+    private final Map<FileReference, AtomicInteger> fileRefUsageMap = Collections.synchronizedMap(new HashMap<>());
+    private final Int2ObjectMap<AtomicInteger> fileIdUsageMap =
+            Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+
+    private final List<ILSMIndex> primaryIndexes = new ArrayList<>();
+    private volatile int flushPtr;
+    private volatile ILSMIndex flushingIndex;
+
+    private final int filteredMemoryComponentMaxNumPages;
+    private final int flushPageBudget;
+    private final VirtualBufferCache vbc;
+    private final AtomicBoolean isOpen = new AtomicBoolean(false);
+    private final FlushThread flushThread = new FlushThread();
+
+    public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties) {
+        this.vbc = new VirtualBufferCache(allocator, storageProperties.getBufferCachePageSize(),
+                (int) (storageProperties.getMemoryComponentGlobalBudget()
+                        / storageProperties.getMemoryComponentPageSize()));
+        this.flushPageBudget = (int) (storageProperties.getMemoryComponentGlobalBudget()
+                / storageProperties.getMemoryComponentPageSize()
+                * storageProperties.getMemoryComponentFlushThreshold());
+        this.filteredMemoryComponentMaxNumPages = storageProperties.getFilteredMemoryComponentMaxNumPages();
+    }
+
+    @Override
+    public int getPageSize() {
+        return vbc.getPageSize();
+    }
+
+    @Override
+    public int getPageSizeWithHeader() {
+        return vbc.getPageSizeWithHeader();
+    }
+
+    @Override
+    public synchronized void register(ILSMMemoryComponent memoryComponent) {
+        ILSMIndex index = memoryComponent.getLsmIndex();
+        if (index.isPrimaryIndex()) {
+            if (!primaryIndexes.contains(index)) {
+                // make sure only add index once
+                primaryIndexes.add(index);
+            }
+            if (index.getNumOfFilterFields() > 0) {
+                // handle filtered primary index
+                AtomicInteger usage = new AtomicInteger();
+                memoryComponentUsageMap.put(memoryComponent, usage);
+                for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+                    if (ref != null) {
+                        fileRefUsageMap.put(ref, usage);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public synchronized void unregister(ILSMMemoryComponent memoryComponent) {
+        ILSMIndex index = memoryComponent.getLsmIndex();
+        if (index.isPrimaryIndex()) {
+            int pos = primaryIndexes.indexOf(index);
+            if (pos >= 0) {
+                primaryIndexes.remove(index);
+                if (flushPtr > pos) {
+                    // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that
+                    // it still points to the same index.
+                    flushPtr = (flushPtr - 1) % primaryIndexes.size();
+                }
+            }
+            if (index.getNumOfFilterFields() > 0) {
+                memoryComponentUsageMap.remove(memoryComponent);
+                for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+                    if (ref != null) {
+                        fileRefUsageMap.remove(ref);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+        if (memoryComponent.getLsmIndex() == flushingIndex) {
+            synchronized (this) {
+                if (memoryComponent.getLsmIndex() == flushingIndex) {
+                    flushingIndex = null;
+                    // After the flush operation is completed, we may have 2 cases:
+                    // 1. there is no active reader on this memory component and memory is reclaimed;
+                    // 2. there are still some active readers and memory cannot be reclaimed.
+                    // But for both cases, we will notify all primary index op trackers to let their writers retry,
+                    // if they have been blocked. Moreover, we will check whether more flushes are needed.
+                    final int size = primaryIndexes.size();
+                    for (int i = 0; i < size; i++) {
+                        ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
+                        synchronized (opTracker) {
+                            opTracker.notifyAll();
+                        }
+                    }
+                }
+            }
+            checkAndNotifyFlushThread();
+        }
+        if (memoryComponent.getLsmIndex().getNumOfFilterFields() > 0
+                && memoryComponent.getLsmIndex().isPrimaryIndex()) {
+            AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent);
+            if (usage != null) {
+                // reset usage to 0 after the memory component is flushed
+                usage.set(0);
+            }
+        }
+    }
+
+    @Override
+    public int getPageBudget() {
+        return vbc.getPageBudget();
+    }
+
+    @Override
+    public boolean isFull() {
+        return vbc.isFull();
+    }
+
+    @Override
+    public boolean isFull(ILSMMemoryComponent memoryComponent) {
+        return memoryComponent.getLsmIndex() == flushingIndex || isFilteredMemoryComponentFull(memoryComponent);
+    }
+
+    private boolean isFilteredMemoryComponentFull(ILSMMemoryComponent memoryComponent) {
+        if (filteredMemoryComponentMaxNumPages <= 0 || memoryComponent.getLsmIndex().getNumOfFilterFields() == 0
+                || !memoryComponent.getLsmIndex().isPrimaryIndex()) {
+            return false;
+        }
+        AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent);
+        return usage.get() >= filteredMemoryComponentMaxNumPages;
+    }
+
+    @Override
+    public int createFile(FileReference fileRef) throws HyracksDataException {
+        int fileId = vbc.createFile(fileRef);
+        updateFileIdUsageMap(fileRef, fileId);
+        return fileId;
+    }
+
+    @Override
+    public int openFile(FileReference fileRef) throws HyracksDataException {
+        int fileId = vbc.openFile(fileRef);
+        updateFileIdUsageMap(fileRef, fileId);
+        return fileId;
+    }
+
+    private void updateFileIdUsageMap(FileReference fileRef, int fileId) {
+        AtomicInteger usage = fileRefUsageMap.get(fileRef);
+        if (usage != null) {
+            fileIdUsageMap.put(fileId, usage);
+        }
+    }
+
+    @Override
+    public void openFile(int fileId) throws HyracksDataException {
+        vbc.openFile(fileId);
+    }
+
+    @Override
+    public void closeFile(int fileId) throws HyracksDataException {
+        vbc.closeFile(fileId);
+    }
+
+    @Override
+    public void deleteFile(FileReference fileRef) throws HyracksDataException {
+        vbc.deleteFile(fileRef);
+    }
+
+    @Override
+    public void deleteFile(int fileId) throws HyracksDataException {
+        vbc.deleteFile(fileId);
+    }
+
+    @Override
+    public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
+        ICachedPage page = vbc.pin(dpid, newPage);
+        if (newPage) {
+            incrementFilteredMemoryComponentUsage(dpid, 1);
+            checkAndNotifyFlushThread();
+        }
+        return page;
+    }
+
+    private void incrementFilteredMemoryComponentUsage(long dpid, int pages) {
+        if (filteredMemoryComponentMaxNumPages > 0) {
+            // update memory usage of filtered index
+            AtomicInteger usage = fileIdUsageMap.get(BufferedFileHandle.getFileId(dpid));
+            if (usage != null) {
+                usage.addAndGet(pages);
+                // We do not need extra code to flush this filtered memory component when it becomes full.
+                // This method is only called when there are active writers on this memory component.
+                // When the writer exits, it'll automatically flush this memory component when it finds out
+                // that this memory component becomes full.
+            }
+        }
+    }
+
+    private void checkAndNotifyFlushThread() {
+        if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
+            // For better performance, we only flush one dataset partition at a time.
+            // After reclaiming memory from this dataset partition, its memory can be used by other indexes.
+            // Thus, given N dataset partitions, each dataset partition will approximately receive 2/N of
+            // the total memory instead of 1/N, which doubles the memory utilization.
+            return;
+        }
+        // Notify the flush thread to schedule flushes. This is used to avoid deadlocks because page pins can be
+        // called while synchronizing on op trackers.
+        synchronized (flushThread.flushLock) {
+            flushThread.flushLock.notifyAll();
+        }
+    }
+
+    @Override
+    public void resizePage(ICachedPage cPage, int multiplier, IExtraPageBlockHelper extraPageBlockHelper)
+            throws HyracksDataException {
+        vbc.resizePage(cPage, multiplier, extraPageBlockHelper);
+        int delta = multiplier - cPage.getFrameSizeMultiplier();
+        incrementFilteredMemoryComponentUsage(((VirtualPage) cPage).dpid(), delta);
+        if (delta > 0) {
+            checkAndNotifyFlushThread();
+        }
+    }
+
+    @Override
+    public void unpin(ICachedPage page) throws HyracksDataException {
+        vbc.unpin(page);
+    }
+
+    @Override
+    public void flush(ICachedPage page) throws HyracksDataException {
+        vbc.flush(page);
+    }
+
+    @Override
+    public void force(int fileId, boolean metadata) throws HyracksDataException {
+        vbc.force(fileId, metadata);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        // no op
+    }
+
+    @Override
+    public void start() {
+        if (isOpen.compareAndSet(false, true)) {
+            try {
+                vbc.open();
+            } catch (HyracksDataException e) {
+                throw new IllegalStateException("Fail to open virtual buffer cache ", e);
+            }
+            flushThread.start();
+        }
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+        if (isOpen.compareAndSet(true, false)) {
+            if (dumpState) {
+                dumpState(ouputStream);
+            }
+            vbc.close();
+            synchronized (flushThread.flushLock) {
+                flushThread.flushLock.notifyAll();
+            }
+            try {
+                flushThread.join();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        os.write(vbc.toString().getBytes());
+    }
+
+    @Override
+    public IFileMapManager getFileMapProvider() {
+        return vbc.getFileMapProvider();
+    }
+
+    @Override
+    public int getNumPagesOfFile(int fileId) throws HyracksDataException {
+        return vbc.getNumPagesOfFile(fileId);
+    }
+
+    @Override
+    public void returnPage(ICachedPage page) {
+        vbc.returnPage(page);
+    }
+
+    @Override
+    public IFIFOPageWriter createFIFOWriter(IPageWriteCallback callback, IPageWriteFailureCallback failureCallback) {
+        return vbc.createFIFOWriter(callback, failureCallback);
+    }
+
+    @Override
+    public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
+        return vbc.confiscatePage(dpid);
+    }
+
+    @Override
+    public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId)
+            throws HyracksDataException {
+        return vbc.confiscateLargePage(dpid, multiplier, extraBlockPageId);
+    }
+
+    @Override
+    public void returnPage(ICachedPage page, boolean reinsert) {
+        vbc.returnPage(page, reinsert);
+    }
+
+    @Override
+    public int getFileReferenceCount(int fileId) {
+        return vbc.getFileReferenceCount(fileId);
+    }
+
+    @Override
+    public boolean isReplicationEnabled() {
+        return vbc.isReplicationEnabled();
+    }
+
+    @Override
+    public IIOReplicationManager getIOReplicationManager() {
+        return vbc.getIOReplicationManager();
+    }
+
+    @Override
+    public void purgeHandle(int fileId) throws HyracksDataException {
+        vbc.purgeHandle(fileId);
+    }
+
+    @Override
+    public String toString() {
+        return vbc.toString();
+    }
+
+    @Override
+    public void closeFileIfOpen(FileReference fileRef) {
+        vbc.closeFileIfOpen(fileRef);
+    }
+
+    @Override
+    public int getUsage() {
+        return vbc.getUsage();
+    }
+
+    /**
+     * We use a dedicated thread to schedule flushes to avoid deadlock. We cannot schedule flushes directly during
+     * page pins because page pins can be called while synchronized on op trackers (e.g., when resetting a
+     * memory component).
+     */
+    private class FlushThread extends Thread {
+        private final Object flushLock = new Object();
+
+        @Override
+        public void run() {
+            while (isOpen.get()) {
+                synchronized (flushLock) {
+                    try {
+                        flushLock.wait();
+                    } catch (InterruptedException e) {
+                        LOGGER.error("Flushing thread is interrupted unexpectedly.", e);
+                    }
+                }
+                if (isOpen.get()) {
+                    try {
+                        scheduleFlush();
+                    } catch (Throwable e) {
+                        LOGGER.error("Unexpected exception when trying to schedule flushes.", e);
+                        ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
+                    }
+                }
+            }
+        }
+
+        private void scheduleFlush() throws HyracksDataException {
+            synchronized (GlobalVirtualBufferCache.this) {
+                if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
+                    return;
+                }
+                int cycles = 0;
+                // find the first modified memory component while avoiding infinite loops
+                while (cycles <= primaryIndexes.size()
+                        && !primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
+                    flushPtr = (flushPtr + 1) % primaryIndexes.size();
+                    cycles++;
+                }
+                if (primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
+                    // flush the current memory component
+                    flushingIndex = primaryIndexes.get(flushPtr);
+                    flushPtr = (flushPtr + 1) % primaryIndexes.size();
+                    // we need to manually flush this memory component because it may be idle at this point
+                    // note that this is different from flushing a filtered memory component
+                    PrimaryIndexOperationTracker opTracker =
+                            (PrimaryIndexOperationTracker) flushingIndex.getOperationTracker();
+                    synchronized (opTracker) {
+                        opTracker.setFlushOnExit(true);
+                        opTracker.flushIfNeeded();
+                        // If the flush cannot be scheduled at this time, then there must be active writers.
+                        // The flush will be eventually scheduled when writers exit
+                    }
+                } else {
+                    throw new IllegalStateException(
+                            "Cannot find modified memory component after checking all primary indexes");
+                }
+            }
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
deleted file mode 100644
index 2a26f08..0000000
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.test.context;
-
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.context.DatasetMemoryManager;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class DatasetMemoryManagerTest {
-
-    private static final StorageProperties storageProperties;
-    private static final long GLOBAL_BUDGET = 1000L;
-    private static final long METADATA_DATASET_BUDGET = 200L;
-    private static final long DATASET_BUDGET = 400L;
-
-    static {
-        storageProperties = Mockito.mock(StorageProperties.class);
-        Mockito.when(storageProperties.getMemoryComponentGlobalBudget()).thenReturn(GLOBAL_BUDGET);
-        Mockito.when(storageProperties.getMemoryComponentNumPages()).thenReturn(8);
-        Mockito.when(storageProperties.getMetadataMemoryComponentNumPages()).thenReturn(4);
-        Mockito.when(storageProperties.getMemoryComponentPageSize()).thenReturn(50);
-        Mockito.when(storageProperties.getMemoryComponentsNum()).thenReturn(2);
-    }
-
-    @Test
-    public void allocate() {
-        DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
-        // double allocate
-        Assert.assertTrue(memoryManager.allocate(1));
-        boolean thrown = false;
-        try {
-            memoryManager.allocate(1);
-        } catch (IllegalStateException e) {
-            Assert.assertTrue(e.getMessage().contains("already allocated"));
-            thrown = true;
-        }
-        Assert.assertTrue(thrown);
-
-        // allocate metadata and non-metadata datasets
-        Assert.assertTrue(memoryManager.allocate(400));
-
-        long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - DATASET_BUDGET;
-        Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
-        // reserve after allocate shouldn't allocate the budget again
-        Assert.assertTrue(memoryManager.allocate(401));
-        Assert.assertTrue(memoryManager.reserve(401));
-
-        // deallocate should still keep the reserved memory
-        memoryManager.deallocate(401);
-        expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - (DATASET_BUDGET * 2);
-        Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
-        // exceed budget should return false
-        Assert.assertFalse(memoryManager.allocate(402));
-    }
-
-    @Test
-    public void reserve() {
-        DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
-        // reserve then allocate budget
-        Assert.assertTrue(memoryManager.reserve(1));
-        Assert.assertTrue(memoryManager.allocate(1));
-        long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET;
-        Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
-        // double reserve
-        Assert.assertTrue(memoryManager.reserve(2));
-
-        // cancel reserved
-        memoryManager.cancelReserved(2);
-        Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-    }
-
-    @Test
-    public void deallocate() {
-        DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
-        // deallocate reserved
-        Assert.assertTrue(memoryManager.reserve(200));
-        Assert.assertTrue(memoryManager.allocate(200));
-        memoryManager.deallocate(200);
-        long expectedBudget = GLOBAL_BUDGET - DATASET_BUDGET;
-        Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
-        // deallocate not allocated
-        boolean thrown = false;
-        try {
-            memoryManager.deallocate(1);
-        } catch (IllegalStateException e) {
-            Assert.assertTrue(e.getMessage().contains("No allocated"));
-            thrown = true;
-        }
-        Assert.assertTrue(thrown);
-
-        // double deallocate
-        memoryManager.allocate(2);
-        memoryManager.deallocate(2);
-        thrown = false;
-        try {
-            memoryManager.deallocate(2);
-        } catch (IllegalStateException e) {
-            Assert.assertTrue(e.getMessage().contains("No allocated"));
-            thrown = true;
-        }
-        Assert.assertTrue(thrown);
-    }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 69fc396..c9de359 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -325,10 +325,6 @@ public class MetadataBootstrap {
     public static void enlistMetadataDataset(INCServiceContext ncServiceCtx, IMetadataIndex index)
             throws HyracksDataException {
         final int datasetId = index.getDatasetId().getId();
-        // reserve memory for metadata dataset to ensure it can be opened when needed
-        if (!appContext.getDatasetMemoryManager().reserve(index.getDatasetId().getId())) {
-            throw new IllegalStateException("Failed to reserve memory for metadata dataset (" + datasetId + ")");
-        }
         String metadataPartitionPath =
                 StoragePathUtil.prepareStoragePartitionPath(MetadataNode.INSTANCE.getMetadataStoragePartition());
         String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index fe64e9f..c9505a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -21,11 +21,9 @@ package org.apache.hyracks.storage.am.common.dataflow;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -50,42 +48,17 @@ public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>,
         this.memoryUsed = 0;
     }
 
-    private boolean evictCandidateIndex() throws HyracksDataException {
-        // Why min()? As a heuristic for eviction, we will take an open index
-        // (an index consuming memory)
-        // that is not being used (refcount == 0) and has been least recently
-        // used. The sort order defined
-        // for IndexInfo maintains this. See IndexInfo.compareTo().
-        IndexInfo info = Collections.min(indexInfos.values());
-        if (info.referenceCount != 0 || !info.isOpen) {
-            return false;
-        }
-
-        info.index.deactivate();
-        //find resource name and deallocate its memory
-        for (Entry<String, IndexInfo> entry : indexInfos.entrySet()) {
-            if (entry.getValue() == info) {
-                deallocateMemory(entry.getKey());
-                break;
-            }
-        }
-        info.isOpen = false;
-        return true;
-    }
-
     private class IndexInfo implements Comparable<IndexInfo> {
         private final IIndex index;
         private int referenceCount;
         private long lastAccess;
         private boolean isOpen;
-        private boolean memoryAllocated;
 
         public IndexInfo(IIndex index) {
             this.index = index;
             this.lastAccess = -1;
             this.referenceCount = 0;
             this.isOpen = false;
-            this.memoryAllocated = false;
         }
 
         public void touch() {
@@ -202,7 +175,6 @@ public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>,
         }
 
         if (!info.isOpen) {
-            allocateMemory(resourcePath);
             info.index.activate();
             info.isOpen = true;
         }
@@ -234,40 +206,7 @@ public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>,
 
         if (info.isOpen) {
             info.index.deactivate();
-            deallocateMemory(resourcePath);
         }
         indexInfos.remove(resourcePath);
     }
-
-    @Override
-    public void allocateMemory(String resourcePath) throws HyracksDataException {
-        IndexInfo info = indexInfos.get(resourcePath);
-        if (info == null) {
-            throw new HyracksDataException("Failed to allocate memory for index with resource ID " + resourcePath
-                    + " since it does not exist.");
-        }
-        if (!info.memoryAllocated) {
-            long inMemorySize = info.index.getMemoryAllocationSize();
-            while (memoryUsed + inMemorySize > memoryBudget) {
-                if (!evictCandidateIndex()) {
-                    throw new HyracksDataException(
-                            "Cannot allocate memory for index since memory budget would be exceeded.");
-                }
-            }
-            memoryUsed += inMemorySize;
-            info.memoryAllocated = true;
-        }
-    }
-
-    private void deallocateMemory(String resourcePath) throws HyracksDataException {
-        IndexInfo info = indexInfos.get(resourcePath);
-        if (info == null) {
-            throw new HyracksDataException("Failed to deallocate memory for index with resource name " + resourcePath
-                    + " since it does not exist.");
-        }
-        if (info.isOpen && info.memoryAllocated) {
-            memoryUsed -= info.index.getMemoryAllocationSize();
-            info.memoryAllocated = false;
-        }
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 3158b79..5e922b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -337,11 +337,6 @@ public abstract class AbstractTreeIndex implements ITreeIndex {
 
     }
 
-    @Override
-    public long getMemoryAllocationSize() {
-        return 0;
-    }
-
     public IBinaryComparatorFactory[] getCmpFactories() {
         return cmpFactories;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
index 831562c..a66a890 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
@@ -23,6 +23,7 @@ import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 
 public class LSMBTreeMemoryComponent extends AbstractLSMMemoryComponent {
 
@@ -38,4 +39,9 @@ public class LSMBTreeMemoryComponent extends AbstractLSMMemoryComponent {
     public BTree getIndex() {
         return btree;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFileRefs() {
+        return new LSMComponentFileReferences(btree.getFileReference(), null, null);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java
index ed2ee70..feed07e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMWithBuddyMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 
 /*
  * This class is also not needed at the moment but is implemented anyway
@@ -56,7 +57,7 @@ public class LSMBTreeWithBuddyMemoryComponent extends AbstractLSMWithBuddyMemory
     }
 
     @Override
-    public long getSize() {
-        return 0L;
+    public LSMComponentFileReferences getComponentFileRefs() {
+        return new LSMComponentFileReferences(btree.getFileReference(), buddyBtree.getFileReference(), null);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index c86f7b9..064ab64 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.common.api;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.MemoryComponentMetadata;
 
 public interface ILSMMemoryComponent extends ILSMComponent {
@@ -48,11 +49,6 @@ public interface ILSMMemoryComponent extends ILSMComponent {
     void reset() throws HyracksDataException;
 
     /**
-     * @return true if the memory budget has been exceeded
-     */
-    boolean isFull();
-
-    /**
      * @return true if there are data in the memory component, false otherwise
      */
     boolean isModified();
@@ -86,11 +82,6 @@ public interface ILSMMemoryComponent extends ILSMComponent {
     void validate() throws HyracksDataException;
 
     /**
-     * @return the size of the memory component
-     */
-    long getSize();
-
-    /**
      * Reset the component Id of the memory component after it's recycled
      *
      * @param newId
@@ -105,4 +96,17 @@ public interface ILSMMemoryComponent extends ILSMComponent {
      * entry to the component
      */
     void setUnwritable();
+
+    /**
+     *
+     * @return the file references of the component
+     */
+    LSMComponentFileReferences getComponentFileRefs();
+
+    /**
+     * Called when the memory component is flushed to disk
+     * @throws HyracksDataException
+     */
+    void flushed() throws HyracksDataException;
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
index ff7bfb5..bbe6051 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
@@ -25,9 +25,42 @@ import org.apache.hyracks.storage.common.file.IFileMapManager;
 public interface IVirtualBufferCache extends IBufferCache {
     void open() throws HyracksDataException;
 
+    /**
+     *
+     * @return true if the overall memory usage exceeds the budget
+     */
     boolean isFull();
 
-    void reset();
+    /**
+     * @param memoryComponent
+     * @return true if the memory component's memory usage exceeds its budget
+     */
+    boolean isFull(ILSMMemoryComponent memoryComponent);
 
     IFileMapManager getFileMapProvider();
+
+    /**
+    *
+    * @return the number of in-use pages
+    */
+    int getUsage();
+
+    /**
+    * Register the memory component when it is allocated
+    * @param memoryComponent
+    */
+    void register(ILSMMemoryComponent memoryComponent);
+
+    /**
+     * Unregister the memory component when it is deallocated
+     * @param memoryComponent
+     */
+    void unregister(ILSMMemoryComponent memoryComponent);
+
+    /**
+     * Notify that virtual buffer cache that the memory component has been flushed to disk
+     * @param memoryComponent
+     * @throws HyracksDataException
+     */
+    void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 01a140f..6c7b1f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -809,15 +809,6 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     }
 
     @Override
-    public long getMemoryAllocationSize() {
-        long size = 0;
-        for (ILSMMemoryComponent c : memoryComponents) {
-            size += c.getSize();
-        }
-        return size;
-    }
-
-    @Override
     public void resetCurrentComponentIndex() {
         synchronized (lsmHarness.getOperationTracker()) {
             // validate no reader in any of the memory components and that all of them are INVALID
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 9440648..a6c82bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -29,7 +29,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -60,7 +59,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
      */
     @Override
     public void schedule(LSMIOOperationType ioOperationType) throws HyracksDataException {
-        activeate();
+        activate();
         if (ioOperationType == LSMIOOperationType.FLUSH) {
             if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) {
                 if (writerCount != 0) {
@@ -79,7 +78,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
         }
     }
 
-    private void activeate() throws HyracksDataException {
+    private void activate() throws HyracksDataException {
         if (state == ComponentState.INACTIVE) {
             state = ComponentState.READABLE_WRITABLE;
             lsmIndex.getIOOperationCallback().recycled(this);
@@ -88,7 +87,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
 
     @Override
     public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) throws HyracksDataException {
-        activeate();
+        activate();
         switch (opType) {
             case FORCE_MODIFICATION:
                 if (isMutableComponent) {
@@ -108,7 +107,9 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
                 break;
             case MODIFICATION:
                 if (isMutableComponent) {
-                    if (state == ComponentState.READABLE_WRITABLE) {
+                    if (state == ComponentState.READABLE_WRITABLE && !vbc.isFull(this) && !vbc.isFull()) {
+                        // Even when the memory component has the writable state, vbc may be temporarily full
+                        // or this memory component may be full.
                         writerCount++;
                     } else {
                         return false;
@@ -159,7 +160,9 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
                     writerCount--;
                     // A failed operation should not change the component state since it's better for
                     // the failed operation's effect to be no-op.
-                    if (state == ComponentState.READABLE_WRITABLE && !failedOperation && isFull()) {
+                    if (state == ComponentState.READABLE_WRITABLE && !failedOperation && vbc.isFull(this)) {
+                        // only mark the component state as unwritable when this memory component
+                        // is full
                         state = ComponentState.READABLE_UNWRITABLE;
                     }
                 } else {
@@ -186,6 +189,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
                 }
                 // operation succeeded
                 if (readerCount == 0) {
+                    // TODO: move reset() outside of the synchronized block (on op tracker)
                     reset();
                 } else {
                     state = ComponentState.UNREADABLE_UNWRITABLE;
@@ -224,11 +228,6 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     }
 
     @Override
-    public boolean isFull() {
-        return vbc.isFull();
-    }
-
-    @Override
     public final void reset() throws HyracksDataException {
         state = ComponentState.INACTIVE;
         isModified.set(false);
@@ -269,7 +268,8 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     @Override
     public final void allocate() throws HyracksDataException {
         boolean allocated = false;
-        ((IVirtualBufferCache) getIndex().getBufferCache()).open();
+        vbc.open();
+        vbc.register(this);
         try {
             doAllocate();
             allocated = true;
@@ -300,6 +300,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
         try {
             state = ComponentState.INACTIVE;
             doDeallocate();
+            vbc.unregister(this);
         } finally {
             getIndex().getBufferCache().close();
         }
@@ -317,12 +318,6 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     }
 
     @Override
-    public long getSize() {
-        IBufferCache virtualBufferCache = getIndex().getBufferCache();
-        return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
-    }
-
-    @Override
     public ILSMComponentId getId() {
         return componentId;
     }
@@ -344,6 +339,11 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im
     }
 
     @Override
+    public void flushed() throws HyracksDataException {
+        vbc.flushed(this);
+    }
+
+    @Override
     public String toString() {
         return "{\"class\":\"" + getClass().getSimpleName() + "\", \"state\":\"" + state + "\", \"writers\":"
                 + writerCount + ", \"readers\":" + readerCount + ", \"pendingFlushes\":" + pendingFlushes
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 65d2fcf..20a2555 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -256,6 +256,11 @@ public class LSMHarness implements ILSMHarness {
                     throw e; // NOSONAR: The last call in the finally clause
                 }
             }
+            if (opType == LSMOperationType.FLUSH) {
+                ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
+                // We must call flushed without synchronizing on opTracker to avoid deadlocks
+                flushingComponent.flushed();
+            }
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 5edcec8..6f3d2eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
@@ -128,11 +129,6 @@ public class MultitenantVirtualBufferCache implements IVirtualBufferCache {
     }
 
     @Override
-    public void reset() {
-        vbc.reset();
-    }
-
-    @Override
     public IFileMapManager getFileMapProvider() {
         return vbc.getFileMapProvider();
     }
@@ -221,4 +217,29 @@ public class MultitenantVirtualBufferCache implements IVirtualBufferCache {
     public void closeFileIfOpen(FileReference fileRef) {
         throw new UnsupportedOperationException();
     }
-}
+
+    @Override
+    public boolean isFull(ILSMMemoryComponent memoryComponent) {
+        return vbc.isFull(memoryComponent);
+    }
+
+    @Override
+    public int getUsage() {
+        return vbc.getUsage();
+    }
+
+    @Override
+    public void register(ILSMMemoryComponent memoryComponent) {
+        vbc.register(memoryComponent);
+    }
+
+    @Override
+    public void unregister(ILSMMemoryComponent memoryComponent) {
+        vbc.unregister(memoryComponent);
+    }
+
+    @Override
+    public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+        vbc.flushed(memoryComponent);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index f24289f..5871b31 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -46,8 +47,15 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class VirtualBufferCache implements IVirtualBufferCache {
+    /**
+     * Based on {@code HashMap}
+     * Note that when a memory component is flushed, it scans vbc to clean up its pages while synchronized on
+     * the op tracker. Thus, it may not be a good idea to set the map factor too large because otherwise it
+     * would increase the blocking time.
+     */
+    private static final float MAP_FACTOR = 0.75f;
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final boolean DEBUG = true;
+    private static final boolean DEBUG = false;
     private final ICacheMemoryAllocator allocator;
     private final IFileMapManager fileMapManager;
     private final int pageSize;
@@ -66,7 +74,7 @@ public class VirtualBufferCache implements IVirtualBufferCache {
             throw new IllegalArgumentException("Page Budget Cannot be 0");
         }
         this.pageBudget = pageBudget;
-        buckets = new CacheBucket[this.pageBudget];
+        buckets = new CacheBucket[(int) (this.pageBudget / MAP_FACTOR)];
         freePages = new ArrayBlockingQueue<>(this.pageBudget);
         largePages = new AtomicInteger(0);
         used = new AtomicInteger(0);
@@ -87,6 +95,7 @@ public class VirtualBufferCache implements IVirtualBufferCache {
         return largePages.get();
     }
 
+    @Override
     public int getUsage() {
         return used.get();
     }
@@ -106,6 +115,12 @@ public class VirtualBufferCache implements IVirtualBufferCache {
     }
 
     @Override
+    public boolean isFull(ILSMMemoryComponent memoryComponent) {
+        // the memory component needs to be flushed when the vbc is full
+        return isFull();
+    }
+
+    @Override
     public int createFile(FileReference fileRef) throws HyracksDataException {
         synchronized (fileMapManager) {
             return fileMapManager.registerFile(fileRef);
@@ -320,7 +335,7 @@ public class VirtualBufferCache implements IVirtualBufferCache {
             throw HyracksDataException.create(ErrorCode.VBC_ALREADY_OPEN);
         }
         allocator.reserveAllocation(pageSize, pageBudget);
-        for (int i = 0; i < pageBudget; i++) {
+        for (int i = 0; i < buckets.length; i++) {
             buckets[i] = new CacheBucket();
         }
         largePages.set(0);
@@ -329,36 +344,12 @@ public class VirtualBufferCache implements IVirtualBufferCache {
     }
 
     @Override
-    public void reset() {
-        recycleAllPages();
-        used.set(0);
-        largePages.set(0);
-    }
-
-    private void recycleAllPages() {
-        for (int i = 0; i < buckets.length; i++) {
-            final CacheBucket bucket = buckets[i];
-            bucket.bucketLock.lock();
-            try {
-                VirtualPage curr = bucket.cachedPage;
-                while (curr != null) {
-                    bucket.cachedPage = curr.next();
-                    recycle(curr);
-                    curr = bucket.cachedPage;
-                }
-            } finally {
-                bucket.bucketLock.unlock();
-            }
-        }
-    }
-
-    @Override
     public void close() throws HyracksDataException {
         if (!open) {
             throw HyracksDataException.create(ErrorCode.VBC_ALREADY_CLOSED);
         }
         freePages.clear();
-        for (int i = 0; i < pageBudget; i++) {
+        for (int i = 0; i < buckets.length; i++) {
             buckets[i].cachedPage = null;
         }
         open = false;
@@ -464,4 +455,19 @@ public class VirtualBufferCache implements IVirtualBufferCache {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void register(ILSMMemoryComponent memoryComponent) {
+        // no op
+    }
+
+    @Override
+    public void unregister(ILSMMemoryComponent memoryComponent) {
+        // no op
+    }
+
+    @Override
+    public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+        // no op
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java
index a8005bc..3b6c1b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java
@@ -23,6 +23,7 @@ import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMWithBuddyMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
 
 public class LSMInvertedIndexMemoryComponent extends AbstractLSMWithBuddyMemoryComponent {
@@ -46,4 +47,10 @@ public class LSMInvertedIndexMemoryComponent extends AbstractLSMWithBuddyMemoryC
     public BTree getBuddyIndex() {
         return deletedKeysBTree;
     }
+
+    @Override
+    public LSMComponentFileReferences getComponentFileRefs() {
+        return new LSMComponentFileReferences(invIndex.getBTree().getFileReference(),
+                deletedKeysBTree.getFileReference(), null);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index a7ce35c..bfbb141 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -148,12 +148,6 @@ public class InMemoryInvertedIndex implements IInPlaceInvertedIndex {
     }
 
     @Override
-    public long getMemoryAllocationSize() {
-        IBufferCache virtualBufferCache = btree.getBufferCache();
-        return (long) virtualBufferCache.getPageBudget() * virtualBufferCache.getPageSize();
-    }
-
-    @Override
     public InvertedListCursor createInvertedListCursor(IHyracksTaskContext ctx) {
         return new InMemoryInvertedListCursor(invListTypeTraits.length, tokenTypeTraits.length);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 42171d1..ec1f143 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -661,11 +661,6 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
         }
     }
 
-    @Override
-    public long getMemoryAllocationSize() {
-        return 0;
-    }
-
     protected static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
         ITypeTraits[] btreeTypeTraits = new ITypeTraits[tokenTypeTraits.length + btreeValueTypeTraits.length];
         // Set key type traits.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java
index ef7b815..7f72d71 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMWithBuddyMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.rtree.impls.RTree;
 
 public class LSMRTreeMemoryComponent extends AbstractLSMWithBuddyMemoryComponent {
@@ -53,4 +54,9 @@ public class LSMRTreeMemoryComponent extends AbstractLSMWithBuddyMemoryComponent
         throw new UnsupportedOperationException("Validation not implemented for LSM R-Trees.");
     }
 
+    @Override
+    public LSMComponentFileReferences getComponentFileRefs() {
+        return new LSMComponentFileReferences(rtree.getFileReference(), btree.getFileReference(), null);
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
index 861fd2d..b715609 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
@@ -128,11 +128,6 @@ public interface IIndex {
     IBufferCache getBufferCache();
 
     /**
-     * @return the size, in bytes, of pre-allocated memory space that this index was allotted.
-     */
-    public long getMemoryAllocationSize();
-
-    /**
      * @param fillFactor
      * @param verifyInput
      * @throws HyracksDataException
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
index b653842..6200680 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
@@ -28,7 +28,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
  * @param <R>
  *            resource class
  */
-public interface IResourceLifecycleManager<R> extends IResourceMemoryManager {
+public interface IResourceLifecycleManager<R> {
     /**
      * get a list of all resources which are opened
      *
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java
deleted file mode 100644
index 036520a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.common;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IResourceMemoryManager {
-    void allocateMemory(String resourcePath) throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java
deleted file mode 100644
index 3b2e3cb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.common.buffercache;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.IResourceMemoryManager;
-
-public class ResourceHeapBufferAllocator implements ICacheMemoryAllocator {
-
-    final IResourceMemoryManager memoryManager;
-    final String resourceName;
-
-    public ResourceHeapBufferAllocator(IResourceMemoryManager memoryManager, String resourceName) {
-        this.memoryManager = memoryManager;
-        this.resourceName = resourceName;
-    }
-
-    @Override
-    public ByteBuffer[] allocate(int pageSize, int numPages) {
-        ByteBuffer[] buffers = new ByteBuffer[numPages];
-        for (int i = 0; i < numPages; ++i) {
-            buffers[i] = ByteBuffer.allocate(pageSize);
-        }
-        return buffers;
-    }
-
-    @Override
-    public ByteBuffer[] ensureAvailabilityThenAllocate(int pageSize, int numPages) throws HyracksDataException {
-        reserveAllocation(pageSize, numPages);
-        return allocate(pageSize, numPages);
-    }
-
-    @Override
-    public void reserveAllocation(int pageSize, int numPages) throws HyracksDataException {
-        memoryManager.allocateMemory(resourceName);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
index d5f3fb2..811987c 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.impl;
 
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+
 public interface IVirtualBufferCacheCallback {
-    void isFullChanged(boolean newValue);
+    void isFullChanged(boolean newValue, ILSMMemoryComponent memoryComponent);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
index 63a51b9..8cad497 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
@@ -20,11 +20,13 @@ package org.apache.hyracks.storage.am.lsm.btree.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
@@ -35,6 +37,7 @@ import org.apache.hyracks.storage.common.file.IFileMapManager;
 
 public class TestVirtualBufferCache implements IVirtualBufferCache {
     private final IVirtualBufferCache vbc;
+    private final ConcurrentHashMap<ILSMMemoryComponent, AtomicBoolean> isFullMap = new ConcurrentHashMap<>();
     private final AtomicBoolean isFull = new AtomicBoolean(false);
     private final List<IVirtualBufferCacheCallback> callbacks;
 
@@ -87,7 +90,14 @@ public class TestVirtualBufferCache implements IVirtualBufferCache {
 
     @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
-        return vbc.pin(dpid, newPage);
+        ICachedPage page = vbc.pin(dpid, newPage);
+        // the memory component can be full after each but, but isFull may not be called by the memory component
+        // for correctness, we call isFull here after each pin
+        for (ILSMMemoryComponent component : isFullMap.keySet()) {
+            isFull(component);
+        }
+
+        return page;
     }
 
     @Override
@@ -190,19 +200,27 @@ public class TestVirtualBufferCache implements IVirtualBufferCache {
     @Override
     public boolean isFull() {
         boolean newValue = vbc.isFull();
+        updateFullValue(newValue, null);
+        return newValue;
+    }
+
+    @Override
+    public boolean isFull(ILSMMemoryComponent memoryComponent) {
+        boolean newValue = vbc.isFull(memoryComponent);
+        updateFullValue(newValue, memoryComponent);
+        return newValue;
+    }
+
+    private void updateFullValue(boolean newValue, ILSMMemoryComponent memoryComponent) {
+        AtomicBoolean isFull = memoryComponent != null
+                ? isFullMap.computeIfAbsent(memoryComponent, m -> new AtomicBoolean()) : this.isFull;
         if (isFull.compareAndSet(!newValue, newValue)) {
             synchronized (callbacks) {
                 for (int i = 0; i < callbacks.size(); i++) {
-                    callbacks.get(i).isFullChanged(newValue);
+                    callbacks.get(i).isFullChanged(newValue, memoryComponent);
                 }
             }
         }
-        return newValue;
-    }
-
-    @Override
-    public void reset() {
-        vbc.reset();
     }
 
     @Override
@@ -215,4 +233,29 @@ public class TestVirtualBufferCache implements IVirtualBufferCache {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public int getUsage() {
+        return vbc.getUsage();
+    }
+
+    @Override
+    public void register(ILSMMemoryComponent memoryComponent) {
+        vbc.register(memoryComponent);
+    }
+
+    @Override
+    public void unregister(ILSMMemoryComponent memoryComponent) {
+        vbc.unregister(memoryComponent);
+    }
+
+    @Override
+    public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+        vbc.flushed(memoryComponent);
+    }
+
+    public void reset() {
+        isFull.set(false);
+        isFullMap.clear();
+    }
+
 }