You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/11/27 21:11:21 UTC

[1/2] asterixdb git commit: [ASTERIXDB-2161][TEST] Add indexes to MultiPartitionTest

Repository: asterixdb
Updated Branches:
  refs/heads/master f9e6bae98 -> c5a0a1974


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
new file mode 100644
index 0000000..e376ff9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+
+public class TestPrimaryIndexOperationTracker extends PrimaryIndexOperationTracker {
+
+    private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
+
+    public TestPrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+            ILSMComponentIdGenerator idGenerator) {
+        super(datasetID, logManager, dsInfo, idGenerator);
+    }
+
+    public void addCallback(ITestOpCallback<Void> callback) {
+        synchronized (callbacks) {
+            callbacks.add(callback);
+        }
+    }
+
+    public void clearCallbacks() {
+        synchronized (callbacks) {
+            callbacks.clear();
+        }
+    }
+
+    @Override
+    public void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
+        synchronized (callbacks) {
+            for (ITestOpCallback<Void> callback : callbacks) {
+                callback.before(null);
+            }
+        }
+        super.triggerScheduleFlush(logRecord);
+        synchronized (callbacks) {
+            for (ITestOpCallback<Void> callback : callbacks) {
+                callback.after();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
new file mode 100644
index 0000000..5d7a7c6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperationTrackerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private int datasetId;
+
+    public TestPrimaryIndexOperationTrackerFactory(int datasetId) {
+        super(datasetId);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+        try {
+            INcApplicationContext appCtx = (INcApplicationContext) ctx.getApplicationContext();
+            DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
+            DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
+            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+            if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
+                Field opTrackerField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker");
+                opTracker = new TestPrimaryIndexOperationTracker(datasetId,
+                        appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), dsr.getIdGenerator());
+                setFinal(opTrackerField, dsr, opTracker);
+            }
+            return opTracker;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static void setFinal(Field field, Object obj, Object newValue) throws Exception {
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+        field.set(obj, newValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 30cfb4f..7543bf6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -134,7 +134,7 @@ public class CheckpointingTest {
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 // Prepare insert operation
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index dabf9d3..8897169 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -130,7 +130,7 @@ public class DiskIsFullTest {
                 IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false);
                 // Prepare insert operation
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml b/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml
new file mode 100644
index 0000000..aaeb244
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml
@@ -0,0 +1,112 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+  <metadataNode>asterix_nc1</metadataNode>
+  <store>
+    <ncId>asterix_nc1</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <store>
+    <ncId>asterix_nc2</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <transactionLogDir>
+    <ncId>asterix_nc1</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
+  </transactionLogDir>
+  <transactionLogDir>
+    <ncId>asterix_nc2</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
+  </transactionLogDir>
+
+  <property>
+    <name>max.wait.active.cluster</name>
+    <value>60</value>
+    <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
+      nodes are available)
+      before a submitted query/statement can be
+      executed. (Default = 60 seconds)
+    </description>
+  </property>
+
+  <property>
+    <name>compiler.framesize</name>
+    <value>32KB</value>
+  </property>
+  <property>
+    <name>compiler.sortmemory</name>
+    <value>320KB</value>
+  </property>
+  <property>
+    <name>compiler.groupmemory</name>
+    <value>160KB</value>
+  </property>
+  <property>
+    <name>compiler.joinmemory</name>
+    <value>256KB</value>
+  </property>
+  <property>
+    <name>storage.buffercache.pagesize</name>
+    <value>32KB</value>
+    <description>The page size in bytes for pages in the buffer cache.
+      (Default = "128KB")
+    </description>
+  </property>
+  <property>
+    <name>storage.buffercache.size</name>
+    <value>48MB</value>
+    <description>The size of memory allocated to the disk buffer cache.
+      The value should be a multiple of the buffer cache page size.
+    </description>
+  </property>
+  <property>
+    <name>storage.memorycomponent.numpages</name>
+    <value>32</value>
+    <description>The number of pages to allocate for a memory component.
+      This budget is shared by all the memory components of the primary
+      index and all its secondary indexes across all I/O devices on a node.
+      Note: in-memory components usually has fill factor of 75% since
+      the pages are 75% full and the remaining 25% is un-utilized.
+    </description>
+  </property>
+  <property>
+    <name>storage.memorycomponent.globalbudget</name>
+    <value>512MB</value>
+    <description>The size of memory allocated to the memory components.
+      The value should be a multiple of the memory component page size.
+    </description>
+  </property>
+  <property>
+    <name>messaging.frame.size</name>
+    <value>4096</value>
+    <description>The frame size to be used for NC to NC messaging. (Default = 4kb)
+    </description>
+  </property>
+  <property>
+    <name>messaging.frame.count</name>
+    <value>512</value>
+    <description>Number of reusable frames for NC to NC messaging. (Default = 512)
+    </description>
+  </property>
+  <property>
+    <name>log.level</name>
+    <value>INFO</value>
+    <description>foo</description>
+  </property>
+</asterixConfiguration>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 8002895..ce43bca 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -207,7 +207,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
             if (opTracker != null && opTracker.getNumActiveOperations() == 0
                     && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen()
                     && !dsr.isMetadataDataset()) {
-                closeDataset(dsr.getDatasetInfo());
+                closeDataset(dsr);
                 LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
                 return true;
             }
@@ -341,7 +341,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
-            flushDatasetOpenIndexes(dsr.getDatasetInfo(), false);
+            flushDatasetOpenIndexes(dsr, false);
         }
     }
 
@@ -349,7 +349,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
         DatasetResource dsr = datasets.get(datasetId);
         if (dsr != null) {
-            flushDatasetOpenIndexes(dsr.getDatasetInfo(), asyncFlush);
+            flushDatasetOpenIndexes(dsr, asyncFlush);
         }
     }
 
@@ -385,11 +385,17 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     /*
      * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
      */
-    private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
+    private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (dsInfo.isExternal()) {
             // no memory components for external dataset
             return;
         }
+        PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker();
+        if (primaryOpTracker.getNumActiveOperations() > 0) {
+            throw new IllegalStateException(
+                    "flushDatasetOpenIndexes is called on a dataset with currently active operations");
+        }
 
         ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
         idGenerator.refresh();
@@ -435,11 +441,12 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         }
     }
 
-    private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
+    private void closeDataset(DatasetResource dsr) throws HyracksDataException {
         // First wait for any ongoing IO operations
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         dsInfo.waitForIO();
         try {
-            flushDatasetOpenIndexes(dsInfo, false);
+            flushDatasetOpenIndexes(dsr, false);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -460,7 +467,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     public synchronized void closeAllDatasets() throws HyracksDataException {
         ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
         for (DatasetResource dsr : openDatasets) {
-            closeDataset(dsr.getDatasetInfo());
+            closeDataset(dsr);
         }
     }
 
@@ -469,7 +476,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
         for (DatasetResource dsr : openDatasets) {
             if (!dsr.isMetadataDataset()) {
-                closeDataset(dsr.getDatasetInfo());
+                closeDataset(dsr);
             }
         }
     }
@@ -568,7 +575,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (replicationStrategy.isMatch(dsr.getDatasetID())) {
-                flushDatasetOpenIndexes(dsr.getDatasetInfo(), false);
+                flushDatasetOpenIndexes(dsr, false);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index 7bb12c4..8df872b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -61,6 +61,7 @@ public interface ILSMIOOperationCallback {
      *
      * @param component
      * @param componentSwitched
+     *            true if the component index was advanced for this recycle, false otherwise
      */
     void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
index 2989af9..acc3347 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
@@ -18,8 +18,8 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.impl;
 
-import java.util.concurrent.Semaphore;
+public interface ITestOpCallback<T> {
+    void before(T t);
 
-public interface ITestOpCallback {
-    void callback(Semaphore smeaphore);
+    void after();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
new file mode 100644
index 0000000..d5f3fb2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+public interface IVirtualBufferCacheCallback {
+    void isFullChanged(boolean newValue);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 8782565..bf3bb31 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -56,11 +57,17 @@ public class TestLsmBtree extends LSMBTree {
     private final Semaphore searchSemaphore = new Semaphore(0);
     private final Semaphore flushSemaphore = new Semaphore(0);
     private final Semaphore mergeSemaphore = new Semaphore(0);
-    private final List<ITestOpCallback> modifyCallbacks = new ArrayList<>();
-    private final List<ITestOpCallback> searchCallbacks = new ArrayList<>();
-    private final List<ITestOpCallback> flushCallbacks = new ArrayList<>();
-    private final List<ITestOpCallback> mergeCallbacks = new ArrayList<>();
-    private final List<ITestOpCallback> allocateComponentCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Semaphore>> modifyCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Semaphore>> searchCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Semaphore>> flushCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Semaphore>> mergeCallbacks = new ArrayList<>();
+
+    private final List<ITestOpCallback<ILSMMemoryComponent>> ioAllocateCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<ILSMMemoryComponent>> ioRecycleCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Void>> ioBeforeCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Void>> ioAfterOpCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Void>> ioAfterFinalizeCallbacks = new ArrayList<>();
+    private final List<ITestOpCallback<Void>> allocateComponentCallbacks = new ArrayList<>();
 
     private volatile int numScheduledFlushes;
     private volatile int numStartedFlushes;
@@ -89,17 +96,22 @@ public class TestLsmBtree extends LSMBTree {
     @Override
     public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
         synchronized (modifyCallbacks) {
-            for (ITestOpCallback callback : modifyCallbacks) {
+            for (ITestOpCallback<Semaphore> callback : modifyCallbacks) {
                 callback(callback, modifySemaphore);
             }
         }
         acquire(modifySemaphore);
         super.modify(ictx, tuple);
+        synchronized (modifyCallbacks) {
+            for (ITestOpCallback<Semaphore> callback : modifyCallbacks) {
+                callback.after();
+            }
+        }
     }
 
-    public static void callback(ITestOpCallback callback, Semaphore semaphore) {
+    public static <T> void callback(ITestOpCallback<T> callback, T t) {
         if (callback != null) {
-            callback.callback(semaphore);
+            callback.before(t);
         }
     }
 
@@ -121,13 +133,18 @@ public class TestLsmBtree extends LSMBTree {
     public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException {
         numStartedFlushes++;
         synchronized (flushCallbacks) {
-            for (ITestOpCallback callback : flushCallbacks) {
+            for (ITestOpCallback<Semaphore> callback : flushCallbacks) {
                 callback(callback, flushSemaphore);
             }
         }
         acquire(flushSemaphore);
         ILSMDiskComponent c = super.doFlush(operation);
         numFinishedFlushes++;
+        synchronized (flushCallbacks) {
+            for (ITestOpCallback<Semaphore> callback : flushCallbacks) {
+                callback.after();
+            }
+        }
         return c;
     }
 
@@ -135,13 +152,18 @@ public class TestLsmBtree extends LSMBTree {
     public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException {
         numStartedMerges++;
         synchronized (mergeCallbacks) {
-            for (ITestOpCallback callback : mergeCallbacks) {
+            for (ITestOpCallback<Semaphore> callback : mergeCallbacks) {
                 callback(callback, mergeSemaphore);
             }
         }
         acquire(mergeSemaphore);
         ILSMDiskComponent c = super.doMerge(operation);
         numFinishedMerges++;
+        synchronized (mergeCallbacks) {
+            for (ITestOpCallback<Semaphore> callback : mergeCallbacks) {
+                callback.after();
+            }
+        }
         return c;
     }
 
@@ -199,11 +221,11 @@ public class TestLsmBtree extends LSMBTree {
         return numFinishedMerges;
     }
 
-    public List<ITestOpCallback> getModifyCallbacks() {
+    public List<ITestOpCallback<Semaphore>> getModifyCallbacks() {
         return modifyCallbacks;
     }
 
-    public void addModifyCallback(ITestOpCallback modifyCallback) {
+    public void addModifyCallback(ITestOpCallback<Semaphore> modifyCallback) {
         synchronized (mergeCallbacks) {
             modifyCallbacks.add(modifyCallback);
         }
@@ -215,7 +237,31 @@ public class TestLsmBtree extends LSMBTree {
         }
     }
 
-    public List<ITestOpCallback> getSearchCallbacks() {
+    public void addIoRecycleCallback(ITestOpCallback<ILSMMemoryComponent> callback) {
+        synchronized (ioRecycleCallbacks) {
+            ioRecycleCallbacks.add(callback);
+        }
+    }
+
+    public void clearIoRecycleCallback() {
+        synchronized (ioRecycleCallbacks) {
+            ioRecycleCallbacks.clear();
+        }
+    }
+
+    public void addIoAllocateCallback(ITestOpCallback<ILSMMemoryComponent> callback) {
+        synchronized (ioAllocateCallbacks) {
+            ioAllocateCallbacks.add(callback);
+        }
+    }
+
+    public void clearIoAllocateCallback() {
+        synchronized (ioAllocateCallbacks) {
+            ioAllocateCallbacks.clear();
+        }
+    }
+
+    public List<ITestOpCallback<Semaphore>> getSearchCallbacks() {
         return searchCallbacks;
     }
 
@@ -225,13 +271,13 @@ public class TestLsmBtree extends LSMBTree {
         }
     }
 
-    public void addSearchCallback(ITestOpCallback searchCallback) {
+    public void addSearchCallback(ITestOpCallback<Semaphore> searchCallback) {
         synchronized (searchCallbacks) {
             searchCallbacks.add(searchCallback);
         }
     }
 
-    public void addFlushCallback(ITestOpCallback flushCallback) {
+    public void addFlushCallback(ITestOpCallback<Semaphore> flushCallback) {
         synchronized (flushCallbacks) {
             flushCallbacks.add(flushCallback);
         }
@@ -243,7 +289,7 @@ public class TestLsmBtree extends LSMBTree {
         }
     }
 
-    public void addMergeCallback(ITestOpCallback mergeCallback) {
+    public void addMergeCallback(ITestOpCallback<Semaphore> mergeCallback) {
         synchronized (mergeCallbacks) {
             mergeCallbacks.add(mergeCallback);
         }
@@ -259,7 +305,7 @@ public class TestLsmBtree extends LSMBTree {
         return searchSemaphore;
     }
 
-    public void addAllocateCallback(ITestOpCallback callback) {
+    public void addAllocateCallback(ITestOpCallback<Void> callback) {
         synchronized (allocateComponentCallbacks) {
             allocateComponentCallbacks.add(callback);
         }
@@ -271,13 +317,111 @@ public class TestLsmBtree extends LSMBTree {
         }
     }
 
+    public void addVirtuablBufferCacheCallback(IVirtualBufferCacheCallback callback) {
+        for (IVirtualBufferCache vbc : virtualBufferCaches) {
+            ((TestVirtualBufferCache) vbc).addCallback(callback);
+        }
+    }
+
+    public void clearVirtuablBufferCacheCallbacks() {
+        for (IVirtualBufferCache vbc : virtualBufferCaches) {
+            ((TestVirtualBufferCache) vbc).clearCallbacks();
+        }
+    }
+
     @Override
     public void allocateMemoryComponents() throws HyracksDataException {
-        super.allocateMemoryComponents();
         synchronized (allocateComponentCallbacks) {
-            for (ITestOpCallback callback : allocateComponentCallbacks) {
+            for (ITestOpCallback<Void> callback : allocateComponentCallbacks) {
                 callback(callback, null);
             }
         }
+        super.allocateMemoryComponents();
+        synchronized (allocateComponentCallbacks) {
+            for (ITestOpCallback<Void> callback : allocateComponentCallbacks) {
+                callback.after();
+            }
+        }
     }
+
+    public void beforeIoOperationCalled() {
+        synchronized (ioBeforeCallbacks) {
+            for (ITestOpCallback<Void> callback : ioBeforeCallbacks) {
+                callback.before(null);
+            }
+        }
+    }
+
+    public void beforeIoOperationReturned() {
+        synchronized (ioBeforeCallbacks) {
+            for (ITestOpCallback<Void> callback : ioBeforeCallbacks) {
+                callback.after();
+            }
+        }
+    }
+
+    public void afterIoOperationCalled() {
+        synchronized (ioAfterOpCallbacks) {
+            for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) {
+                callback.before(null);
+            }
+        }
+    }
+
+    public void afterIoOperationReturned() {
+        synchronized (ioAfterOpCallbacks) {
+            for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) {
+                callback.after();
+            }
+        }
+    }
+
+    public void afterIoFinalizeCalled() {
+        synchronized (ioAfterFinalizeCallbacks) {
+            for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) {
+                callback.before(null);
+            }
+        }
+    }
+
+    public void afterIoFinalizeReturned() {
+        synchronized (ioAfterFinalizeCallbacks) {
+            for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) {
+                callback.after();
+            }
+        }
+    }
+
+    public void recycledCalled(ILSMMemoryComponent component) {
+        synchronized (ioRecycleCallbacks) {
+            for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) {
+                callback.before(component);
+            }
+        }
+    }
+
+    public void recycledReturned(ILSMMemoryComponent component) {
+        synchronized (ioRecycleCallbacks) {
+            for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) {
+                callback.after();
+            }
+        }
+    }
+
+    public void allocatedCalled(ILSMMemoryComponent component) {
+        synchronized (ioAllocateCallbacks) {
+            for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) {
+                callback.before(component);
+            }
+        }
+    }
+
+    public void allocatedReturned(ILSMMemoryComponent component) {
+        synchronized (ioAllocateCallbacks) {
+            for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) {
+                callback.after();
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index 47a8046..9b53120 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -60,6 +60,13 @@ public class TestLsmBtreeLocalResource extends LSMBTreeLocalResource {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
         List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+        for (int i = 0; i < vbcs.size(); i++) {
+            IVirtualBufferCache vbc = vbcs.get(i);
+            if (!(vbc instanceof TestVirtualBufferCache)) {
+                vbcs.remove(i);
+                vbcs.add(i, new TestVirtualBufferCache(vbc));
+            }
+        }
         ioOpCallbackFactory.initialize(serviceCtx);
         return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java
index d3504ae..45e39aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.btree.impl;
 
 import java.util.List;
+import java.util.concurrent.Semaphore;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
@@ -36,9 +37,9 @@ public class TestLsmBtreeSearchCursor extends LSMBTreeSearchCursor {
     @Override
     public void next() throws HyracksDataException {
         try {
-            List<ITestOpCallback> callbacks = lsmBtree.getSearchCallbacks();
+            List<ITestOpCallback<Semaphore>> callbacks = lsmBtree.getSearchCallbacks();
             synchronized (callbacks) {
-                for (ITestOpCallback cb : callbacks) {
+                for (ITestOpCallback<Semaphore> cb : callbacks) {
                     TestLsmBtree.callback(cb, lsmBtree.getSearchSemaphore());
                 }
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
new file mode 100644
index 0000000..c7e064f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.file.IFileMapManager;
+
+public class TestVirtualBufferCache implements IVirtualBufferCache {
+    private final IVirtualBufferCache vbc;
+    private final AtomicBoolean isFull = new AtomicBoolean(false);
+    private final List<IVirtualBufferCacheCallback> callbacks;
+
+    public TestVirtualBufferCache(IVirtualBufferCache vbc) {
+        this.vbc = vbc;
+        callbacks = new ArrayList<>();
+    }
+
+    public void addCallback(IVirtualBufferCacheCallback callback) {
+        synchronized (callbacks) {
+            callbacks.add(callback);
+        }
+    }
+
+    public void clearCallbacks() {
+        synchronized (callbacks) {
+            callbacks.clear();
+        }
+    }
+
+    @Override
+    public int createFile(FileReference fileRef) throws HyracksDataException {
+        return vbc.createFile(fileRef);
+    }
+
+    @Override
+    public int openFile(FileReference fileRef) throws HyracksDataException {
+        return vbc.openFile(fileRef);
+    }
+
+    @Override
+    public void openFile(int fileId) throws HyracksDataException {
+        vbc.openFile(fileId);
+    }
+
+    @Override
+    public void closeFile(int fileId) throws HyracksDataException {
+        vbc.closeFile(fileId);
+    }
+
+    @Override
+    public void deleteFile(int fileId) throws HyracksDataException {
+        vbc.deleteFile(fileId);
+    }
+
+    @Override
+    public void deleteFile(FileReference file) throws HyracksDataException {
+        vbc.deleteFile(file);
+    }
+
+    @Override
+    public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
+        return vbc.pin(dpid, newPage);
+    }
+
+    @Override
+    public void unpin(ICachedPage page) throws HyracksDataException {
+        vbc.unpin(page);
+    }
+
+    @Override
+    public void flush(ICachedPage page) throws HyracksDataException {
+        vbc.flush(page);
+    }
+
+    @Override
+    public void force(int fileId, boolean metadata) throws HyracksDataException {
+        vbc.force(fileId, metadata);
+    }
+
+    @Override
+    public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
+        return vbc.confiscatePage(dpid);
+    }
+
+    @Override
+    public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId)
+            throws HyracksDataException {
+        return vbc.confiscateLargePage(dpid, multiplier, extraBlockPageId);
+    }
+
+    @Override
+    public void returnPage(ICachedPage page) {
+        vbc.returnPage(page);
+    }
+
+    @Override
+    public void returnPage(ICachedPage page, boolean reinsert) {
+        vbc.returnPage(page, reinsert);
+    }
+
+    @Override
+    public int getPageSize() {
+        return vbc.getPageSize();
+    }
+
+    @Override
+    public int getPageSizeWithHeader() {
+        return vbc.getPageSizeWithHeader();
+    }
+
+    @Override
+    public int getPageBudget() {
+        return vbc.getPageBudget();
+    }
+
+    @Override
+    public int getNumPagesOfFile(int fileId) throws HyracksDataException {
+        return vbc.getNumPagesOfFile(fileId);
+    }
+
+    @Override
+    public int getFileReferenceCount(int fileId) {
+        return vbc.getFileReferenceCount(fileId);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        vbc.close();
+    }
+
+    @Override
+    public IFIFOPageQueue createFIFOQueue() {
+        return vbc.createFIFOQueue();
+    }
+
+    @Override
+    public void finishQueue() throws HyracksDataException {
+        vbc.finishQueue();
+    }
+
+    @Override
+    public boolean isReplicationEnabled() {
+        return vbc.isReplicationEnabled();
+    }
+
+    @Override
+    public IIOReplicationManager getIOReplicationManager() {
+        return vbc.getIOReplicationManager();
+    }
+
+    @Override
+    public void purgeHandle(int fileId) throws HyracksDataException {
+        vbc.purgeHandle(fileId);
+    }
+
+    @Override
+    public void resizePage(ICachedPage page, int multiplier, IExtraPageBlockHelper extraPageBlockHelper)
+            throws HyracksDataException {
+        vbc.resizePage(page, multiplier, extraPageBlockHelper);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        vbc.open();
+    }
+
+    @Override
+    public boolean isFull() {
+        boolean newValue = vbc.isFull();
+        if (isFull.compareAndSet(!newValue, newValue)) {
+            synchronized (callbacks) {
+                for (int i = 0; i < callbacks.size(); i++) {
+                    callbacks.get(i).isFullChanged(newValue);
+                }
+            }
+        }
+        return newValue;
+    }
+
+    @Override
+    public void reset() {
+        vbc.reset();
+    }
+
+    @Override
+    public IFileMapManager getFileMapProvider() {
+        return vbc.getFileMapProvider();
+    }
+}


[2/2] asterixdb git commit: [ASTERIXDB-2161][TEST] Add indexes to MultiPartitionTest

Posted by am...@apache.org.
[ASTERIXDB-2161][TEST] Add indexes to MultiPartitionTest

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- This change adds secondary indexes to multi partition
  LSM indexes tests.
- This enables testing of specific concurrency scenarios
  and ensuring properties stored in primary and secondary
  indexes are consistent.
- In addition, the call for flushDataset in
  DatasetLifecycleManager now throws an
  IllegalStateException if the number of active
  operations is not 0. Some tests used to call this
  function when there are ongoing operations and that
  is expected to never be the case in the actual system.

Change-Id: I5aea71a87f149b01f6c7310867fc15b5a340b93c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2173
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Luo Chen <cl...@uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c5a0a197
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c5a0a197
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c5a0a197

Branch: refs/heads/master
Commit: c5a0a1974d36d647a22d606d53bdaafd85f641df
Parents: f9e6bae
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Mon Nov 27 11:53:30 2017 -0800
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Mon Nov 27 13:11:06 2017 -0800

----------------------------------------------------------------------
 .../app/bootstrap/TestNodeController.java       | 213 ++++++-
 .../test/dataflow/ComponentRollbackTest.java    | 139 +++--
 .../asterix/test/dataflow/LogMarkerTest.java    |   2 +-
 .../dataflow/MultiPartitionLSMIndexTest.java    | 561 +++++++++++++++++--
 .../SearchCursorComponentSwitchTest.java        |  21 +-
 .../TestLsmBTreeResourceFactoryProvider.java    |   4 +
 .../TestLsmBtreeIoOpCallbackFactory.java        |  40 ++
 .../TestPrimaryIndexOperationTracker.java       |  68 +++
 ...TestPrimaryIndexOperationTrackerFactory.java |  68 +++
 .../asterix/test/logging/CheckpointingTest.java |   2 +-
 .../asterix/test/storage/DiskIsFullTest.java    |   2 +-
 .../multi-partition-test-configuration.xml      | 112 ++++
 .../common/context/DatasetLifecycleManager.java |  25 +-
 .../lsm/common/api/ILSMIOOperationCallback.java |   1 +
 .../am/lsm/btree/impl/ITestOpCallback.java      |   6 +-
 .../btree/impl/IVirtualBufferCacheCallback.java |  23 +
 .../storage/am/lsm/btree/impl/TestLsmBtree.java | 182 +++++-
 .../btree/impl/TestLsmBtreeLocalResource.java   |   7 +
 .../btree/impl/TestLsmBtreeSearchCursor.java    |   5 +-
 .../lsm/btree/impl/TestVirtualBufferCache.java  | 215 +++++++
 20 files changed, 1539 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 72a9b44..352a5f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -69,6 +69,8 @@ import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -89,6 +91,7 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
@@ -166,33 +169,93 @@ public class TestNodeController {
     public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
             Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider)
+            StorageComponentProvider storageComponentProvider, Index secondaryIndex)
             throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
-                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
-        IndexOperation op = IndexOperation.INSERT;
-        IModificationOperationCallbackFactory modOpCallbackFactory =
-                new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(),
-                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
-                        ResourceType.LSM_BTREE);
-        IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
-        IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
-                storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-        LSMInsertDeleteOperatorNodePushable insertOp =
-                new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
-                        primaryIndexInfo.primaryIndexInsertFieldsPermutations,
-                        recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
-                        op, true, indexHelperFactory, modOpCallbackFactory, null);
-        CommitRuntime commitOp =
-                new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes,
-                        true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
-        insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
-        commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
-        return Pair.of(insertOp, commitOp);
+        CcApplicationContext appCtx =
+                (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+        MetadataProvider mdProvider = new MetadataProvider(appCtx, null);
+        try {
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                    mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
+            IndexOperation op = IndexOperation.INSERT;
+            IModificationOperationCallbackFactory modOpCallbackFactory =
+                    new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(),
+                            primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
+                            ResourceType.LSM_BTREE);
+            IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
+            RecordDescriptor recordDesc =
+                    recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0);
+            IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
+                    storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+            LSMInsertDeleteOperatorNodePushable insertOp =
+                    new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
+                            primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, op, true,
+                            indexHelperFactory, modOpCallbackFactory, null);
+
+            // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta>
+            // for the index, we will have to create an assign operator that extract the sk
+            // then the secondary LSMInsertDeleteOperatorNodePushable
+            if (secondaryIndex != null) {
+                List<List<String>> skNames = secondaryIndex.getKeyFieldNames();
+                List<Integer> indicators = secondaryIndex.getKeyFieldSourceIndicators();
+                IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories =
+                        new IScalarEvaluatorFactory[skNames.size()];
+                for (int i = 0; i < skNames.size(); i++) {
+                    ARecordType sourceType = dataset.hasMetaPart()
+                            ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordType : metaType
+                            : recordType;
+                    int pos = skNames.get(i).size() > 1 ? -1 : sourceType.getFieldIndex(skNames.get(i).get(0));
+                    secondaryFieldAccessEvalFactories[i] = mdProvider.getDataFormat().getFieldAccessEvaluatorFactory(
+                            mdProvider.getFunctionManager(), sourceType, secondaryIndex.getKeyFieldNames().get(i), pos);
+                }
+                // outColumns are computed inside the assign runtime
+                int[] outColumns = new int[skNames.size()];
+                // projection list include old and new (primary and secondary keys)
+                int[] projectionList = new int[skNames.size() + primaryIndexInfo.index.getKeyFieldNames().size()];
+                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) {
+                    outColumns[i] = primaryIndexInfo.rDesc.getFieldCount() + i;
+                }
+                int projCount = 0;
+                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) {
+                    projectionList[projCount++] = primaryIndexInfo.rDesc.getFieldCount() + i;
+                }
+                for (int i = 0; i < primaryIndexInfo.index.getKeyFieldNames().size(); i++) {
+                    projectionList[projCount++] = i;
+                }
+                IPushRuntime assignOp =
+                        new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, projectionList, true)
+                                .createPushRuntime(ctx);
+                insertOp.setOutputFrameWriter(0, assignOp, primaryIndexInfo.rDesc);
+                assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+                SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
+                IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory(
+                        storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider);
+                LSMInsertDeleteOperatorNodePushable secondaryInsertOp =
+                        new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
+                                secondaryIndexInfo.insertFieldsPermutations, secondaryIndexInfo.rDesc, op, false,
+                                secondaryIndexHelperFactory, NoOpOperationCallbackFactory.INSTANCE, null);
+                assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc);
+                CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
+                        secondaryIndexInfo.primaryKeyIndexes, true, ctx.getTaskAttemptId().getTaskId().getPartition(),
+                        true);
+                secondaryInsertOp.setOutputFrameWriter(0, commitOp, secondaryIndexInfo.rDesc);
+                commitOp.setInputRecordDescriptor(0, secondaryIndexInfo.rDesc);
+                return Pair.of(insertOp, commitOp);
+            } else {
+                CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
+                        primaryIndexInfo.primaryKeyIndexes, true, ctx.getTaskAttemptId().getTaskId().getPartition(),
+                        true);
+                insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
+                commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+                return Pair.of(insertOp, commitOp);
+            }
+        } finally {
+            mdProvider.getLocks().unlock();
+        }
     }
 
     public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset,
@@ -271,6 +334,34 @@ public class TestNodeController {
         return primaryIndexInfo;
     }
 
+    public SecondaryIndexInfo createSecondaryIndex(PrimaryIndexInfo primaryIndexInfo, Index secondaryIndex,
+            IStorageComponentProvider storageComponentProvider, int partition)
+            throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(primaryIndexInfo.dataset, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        Dataverse dataverse = new Dataverse(primaryIndexInfo.dataset.getDataverseName(),
+                NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
+        MetadataProvider mdProvider = new MetadataProvider(
+                (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
+        SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
+        try {
+
+            IResourceFactory resourceFactory = primaryIndexInfo.dataset.getResourceFactory(mdProvider, secondaryIndex,
+                    primaryIndexInfo.recordType, primaryIndexInfo.metaType, mergePolicy.first, mergePolicy.second);
+            IndexBuilderFactory indexBuilderFactory =
+                    new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
+                            secondaryIndexInfo.fileSplitProvider, resourceFactory, true);
+            IHyracksTaskContext ctx = createTestContext(newJobId(), partition, false);
+            IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition);
+            indexBuilder.build();
+        } finally {
+            mdProvider.getLocks().unlock();
+        }
+        return secondaryIndexInfo;
+    }
+
     public static ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
             IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) {
         int i = 0;
@@ -286,6 +377,22 @@ public class TestNodeController {
         return primaryIndexSerdes;
     }
 
+    public static ISerializerDeserializer<?>[] createSecondaryIndexSerdes(ARecordType recordType, ARecordType metaType,
+            IAType[] primaryKeyTypes, IAType[] secondaryKeyTypes) {
+        ISerializerDeserializer<?>[] secondaryIndexSerdes =
+                new ISerializerDeserializer<?>[secondaryKeyTypes.length + primaryKeyTypes.length];
+        int i = 0;
+        for (; i < secondaryKeyTypes.length; i++) {
+            secondaryIndexSerdes[i] =
+                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(secondaryKeyTypes[i]);
+        }
+        for (; i < primaryKeyTypes.length; i++) {
+            secondaryIndexSerdes[i] =
+                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+        }
+        return secondaryIndexSerdes;
+    }
+
     public static ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
             ARecordType recordType, ARecordType metaType) {
         ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields];
@@ -300,6 +407,19 @@ public class TestNodeController {
         return primaryIndexTypeTraits;
     }
 
+    public static ITypeTraits[] createSecondaryIndexTypeTraits(ARecordType recordType, ARecordType metaType,
+            IAType[] primaryKeyTypes, IAType[] secondaryKeyTypes) {
+        ITypeTraits[] secondaryIndexTypeTraits = new ITypeTraits[secondaryKeyTypes.length + primaryKeyTypes.length];
+        int i = 0;
+        for (; i < secondaryKeyTypes.length; i++) {
+            secondaryIndexTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(secondaryKeyTypes[i]);
+        }
+        for (; i < primaryKeyTypes.length; i++) {
+            secondaryIndexTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+        }
+        return secondaryIndexTypeTraits;
+    }
+
     public IHyracksTaskContext createTestContext(JobId jobId, int partition, boolean withMessaging)
             throws HyracksDataException {
         IHyracksTaskContext ctx = TestUtils.create(KB32);
@@ -337,7 +457,47 @@ public class TestNodeController {
         return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager();
     }
 
+    public static class SecondaryIndexInfo {
+        private int[] primaryKeyIndexes;
+        private PrimaryIndexInfo primaryIndexInfo;
+        private Index secondaryIndex;
+        private ConstantFileSplitProvider fileSplitProvider;
+        private RecordDescriptor rDesc;
+        private int[] insertFieldsPermutations;
+
+        public SecondaryIndexInfo(PrimaryIndexInfo primaryIndexInfo, Index secondaryIndex) {
+            this.primaryIndexInfo = primaryIndexInfo;
+            this.secondaryIndex = secondaryIndex;
+            List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
+            CcApplicationContext appCtx =
+                    (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+            FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(),
+                    primaryIndexInfo.dataset, secondaryIndex.getIndexName(), nodes);
+            fileSplitProvider = new ConstantFileSplitProvider(splits);
+            ITypeTraits[] secondaryIndexTypeTraits = createSecondaryIndexTypeTraits(primaryIndexInfo.recordType,
+                    primaryIndexInfo.metaType, primaryIndexInfo.primaryKeyTypes,
+                    secondaryIndex.getKeyFieldTypes().toArray(new IAType[secondaryIndex.getKeyFieldTypes().size()]));
+            ISerializerDeserializer<?>[] secondaryIndexSerdes = createSecondaryIndexSerdes(primaryIndexInfo.recordType,
+                    primaryIndexInfo.metaType, primaryIndexInfo.primaryKeyTypes,
+                    secondaryIndex.getKeyFieldTypes().toArray(new IAType[secondaryIndex.getKeyFieldTypes().size()]));
+            rDesc = new RecordDescriptor(secondaryIndexSerdes, secondaryIndexTypeTraits);
+            insertFieldsPermutations = new int[secondaryIndexTypeTraits.length];
+            for (int i = 0; i < insertFieldsPermutations.length; i++) {
+                insertFieldsPermutations[i] = i;
+            }
+            primaryKeyIndexes = new int[primaryIndexInfo.primaryKeyIndexes.length];
+            for (int i = 0; i < primaryKeyIndexes.length; i++) {
+                primaryKeyIndexes[i] = i + secondaryIndex.getKeyFieldNames().size();
+            }
+        }
+
+        public IFileSplitProvider getFileSplitProvider() {
+            return fileSplitProvider;
+        }
+    }
+
     public static class PrimaryIndexInfo {
+        private Dataset dataset;
         private IAType[] primaryKeyTypes;
         private ARecordType recordType;
         private ARecordType metaType;
@@ -356,6 +516,7 @@ public class TestNodeController {
                 ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
                 int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
                 throws AlgebricksException {
+            this.dataset = dataset;
             this.primaryKeyTypes = primaryKeyTypes;
             this.recordType = recordType;
             this.metaType = metaType;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a239210..e923d93 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -27,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.function.Predicate;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
@@ -37,6 +39,7 @@ import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -67,9 +70,11 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
@@ -112,12 +117,24 @@ public class ComponentRollbackTest {
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
     private static LSMInsertDeleteOperatorNodePushable insertOp;
+    public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
+        @Override
+        public void before(Semaphore smeaphore) {
+            smeaphore.release();
+        }
+
+        @Override
+        public void after() {
+        }
+    };
 
     @BeforeClass
     public static void setUp() throws Exception {
         System.out.println("SetUp: ");
         TestHelper.deleteExistingInstanceFiles();
-        nc = new TestNodeController(null, false);
+        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "multi-partition-test-configuration.xml";
+        nc = new TestNodeController(configPath, false);
         nc.init();
         ncAppCtx = nc.getAppRuntimeContext();
         dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
@@ -153,7 +170,7 @@ public class ComponentRollbackTest {
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager).getLeft();
+                KEY_INDICATORS_LIST, storageManager, null).getLeft();
     }
 
     @After
@@ -162,10 +179,10 @@ public class ComponentRollbackTest {
     }
 
     static void allowAllOps(TestLsmBtree lsmBtree) {
-        lsmBtree.addModifyCallback(sem -> sem.release());
-        lsmBtree.addFlushCallback(sem -> sem.release());
-        lsmBtree.addSearchCallback(sem -> sem.release());
-        lsmBtree.addMergeCallback(sem -> sem.release());
+        lsmBtree.addModifyCallback(ALLOW_CALLBACK);
+        lsmBtree.addFlushCallback(ALLOW_CALLBACK);
+        lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+        lsmBtree.addMergeCallback(ALLOW_CALLBACK);
     }
 
     @Test
@@ -184,7 +201,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -222,6 +239,16 @@ public class ComponentRollbackTest {
         }
     }
 
+    public void flush(boolean async) throws Exception {
+        flush(dsLifecycleMgr, lsmBtree, dataset, async);
+    }
+
+    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
+            boolean async) throws Exception {
+        waitForOperations(lsmBtree);
+        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+    }
+
     @Test
     public void testRollbackThenInsert() {
         try {
@@ -238,7 +265,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -267,7 +294,7 @@ public class ComponentRollbackTest {
             txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                     new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
             insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                    KEY_INDICATORS_LIST, storageManager).getLeft();
+                    KEY_INDICATORS_LIST, storageManager, null).getLeft();
             insertOp.open();
             for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
                 ITupleReference tuple = tupleGenerator.next();
@@ -311,7 +338,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -339,7 +366,7 @@ public class ComponentRollbackTest {
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // search now and ensure
@@ -359,7 +386,7 @@ public class ComponentRollbackTest {
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(secondSearcher.result());
             searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
@@ -385,7 +412,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -404,7 +431,7 @@ public class ComponentRollbackTest {
             // disable flushes
             lsmBtree.clearFlushCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            flush(true);
             firstFlusher.waitUntilCount(1);
             // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
@@ -441,7 +468,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -505,7 +532,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -526,7 +553,7 @@ public class ComponentRollbackTest {
             lsmBtree.clearFlushCallbacks();
             lsmBtree.clearSearchCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            flush(true);
             firstFlusher.waitUntilCount(1);
             Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
@@ -535,7 +562,7 @@ public class ComponentRollbackTest {
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             //unblock the flush
             lsmBtree.allowFlush(1);
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // ensure current mem component is not modified
@@ -565,7 +592,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -585,7 +612,7 @@ public class ComponentRollbackTest {
             // disable searches
             lsmBtree.clearFlushCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            flush(true);
             firstFlusher.waitUntilCount(1);
             lsmBtree.clearSearchCallbacks();
             Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
@@ -594,7 +621,7 @@ public class ComponentRollbackTest {
             // now that we enetered, we will rollback
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             // The rollback will be waiting for the flush to complete
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             //unblock the flush
@@ -627,7 +654,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -666,7 +693,7 @@ public class ComponentRollbackTest {
             // unblock the merge
             lsmBtree.allowMerge(1);
             // unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             rollerback.complete();
@@ -698,7 +725,7 @@ public class ComponentRollbackTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    flush(false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -734,7 +761,7 @@ public class ComponentRollbackTest {
             // now that we enetered, we will rollback
             Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
             // unblock the search
-            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.addSearchCallback(ALLOW_CALLBACK);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // even though rollback has been called, it is still waiting for the merge to complete
@@ -790,10 +817,18 @@ public class ComponentRollbackTest {
 
         public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
                 TestLsmBtree lsmBtree, int numOfRecords) {
-            lsmBtree.addSearchCallback(sem -> {
-                synchronized (Searcher.this) {
-                    entered = true;
-                    Searcher.this.notifyAll();
+            lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore sem) {
+                    synchronized (Searcher.this) {
+                        entered = true;
+                        Searcher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
                 }
             });
             Callable<Boolean> callable = new Callable<Boolean>() {
@@ -821,10 +856,18 @@ public class ComponentRollbackTest {
         private volatile int count = 0;
 
         public Merger(TestLsmBtree lsmBtree) {
-            lsmBtree.addMergeCallback(sem -> {
-                synchronized (Merger.this) {
-                    count++;
-                    Merger.this.notifyAll();
+            lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Merger.this) {
+                        count++;
+                        Merger.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
                 }
             });
         }
@@ -840,10 +883,18 @@ public class ComponentRollbackTest {
         private volatile int count = 0;
 
         public Flusher(TestLsmBtree lsmBtree) {
-            lsmBtree.addFlushCallback(sem -> {
-                synchronized (Flusher.this) {
-                    count++;
-                    Flusher.this.notifyAll();
+            lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
+
+                @Override
+                public void before(Semaphore smeaphore) {
+                    synchronized (Flusher.this) {
+                        count++;
+                        Flusher.this.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
                 }
             });
         }
@@ -889,6 +940,20 @@ public class ComponentRollbackTest {
         Assert.assertEquals(numOfRecords, countOp.getCount());
     }
 
+    public static void waitForOperations(ILSMIndex index) throws InterruptedException {
+        // wait until number of activeOperation reaches 0
+        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+        long maxWaitTime = 60000L; // 1 minute
+        long before = System.currentTimeMillis();
+        while (opTracker.getNumActiveOperations() > 0) {
+            Thread.sleep(5); // NOSONAR: Test code with a timeout
+            if (System.currentTimeMillis() - before > maxWaitTime) {
+                throw new IllegalStateException(
+                        (System.currentTimeMillis() - before) + "ms passed without completing the frame operation");
+            }
+        }
+    }
+
     public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
             Collection<FrameWriterOperation> exceptionThrowingOperations,
             Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 963cded..0a968c8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -124,7 +124,7 @@ public class LogMarkerTest {
                 ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 2c8141ce..c12f50c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -18,18 +18,23 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
 import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -44,14 +49,21 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.IVirtualBufferCacheCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -59,8 +71,18 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class MultiPartitionLSMIndexTest {
+    static final int REPREAT_TEST_COUNT = 1;
+
+    @Parameterized.Parameters
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+    }
+
     private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
     private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
             new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
@@ -73,30 +95,42 @@ public class MultiPartitionLSMIndexTest {
     private static final int[] KEY_INDEXES = { 0 };
     private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
     private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
-    private static final int TOTAL_NUM_OF_RECORDS = 10000;
-    private static final int RECORDS_PER_COMPONENT = 1000;
+    private static final int TOTAL_NUM_OF_RECORDS = 5000;
+    private static final int RECORDS_PER_COMPONENT = 500;
     private static final int DATASET_ID = 101;
     private static final String DATAVERSE_NAME = "TestDV";
     private static final String DATASET_NAME = "TestDS";
+    private static final String INDEX_NAME = "TestIdx";
     private static final String DATA_TYPE_NAME = "DUMMY";
     private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final IndexType INDEX_TYPE = IndexType.BTREE;
+    private static final List<List<String>> INDEX_FIELD_NAMES =
+            Arrays.asList(Arrays.asList(RECORD_TYPE.getFieldNames()[1]));
+    private static final List<Integer> INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR);
+    private static final List<IAType> INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64);
     private static final StorageComponentProvider storageManager = new StorageComponentProvider();
     private static final int NUM_PARTITIONS = 2;
     private static TestNodeController nc;
     private static NCAppRuntimeContext ncAppCtx;
     private static IDatasetLifecycleManager dsLifecycleMgr;
     private static Dataset dataset;
+    private static Index secondaryIndex;
     private static ITransactionContext txnCtx;
-    private static TestLsmBtree[] primarylsmBtrees;
+    private static TestLsmBtree[] primaryLsmBtrees;
+    private static TestLsmBtree[] secondaryLsmBtrees;
     private static IHyracksTaskContext[] taskCtxs;
-    private static IIndexDataflowHelper[] indexDataflowHelpers;
+    private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
+    private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
     private static LSMInsertDeleteOperatorNodePushable[] insertOps;
+    private static Actor[] actors;
 
     @BeforeClass
     public static void setUp() throws Exception {
         System.out.println("SetUp: ");
         TestHelper.deleteExistingInstanceFiles();
-        nc = new TestNodeController(null, false);
+        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "multi-partition-test-configuration.xml";
+        nc = new TestNodeController(configPath, false);
         nc.init();
         ncAppCtx = nc.getAppRuntimeContext();
         dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
@@ -113,84 +147,515 @@ public class MultiPartitionLSMIndexTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        dataset =
+                new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                        NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                                partitioningKeys, null, null, null, false, null),
+                        null, DatasetType.INTERNAL, DATASET_ID, 0);
+        secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, INDEX_TYPE, INDEX_FIELD_NAMES,
+                INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, false, 0);
         taskCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
-        indexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
-        primarylsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
+        primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        secondaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        primaryLsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
+        secondaryLsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
         insertOps = new LSMInsertDeleteOperatorNodePushable[NUM_PARTITIONS];
         JobId jobId = nc.newJobId();
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        actors = new Actor[NUM_PARTITIONS];
         for (int i = 0; i < taskCtxs.length; i++) {
             taskCtxs[i] = nc.createTestContext(jobId, i, false);
             PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                     storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, i);
+            SecondaryIndexInfo secondaryIndexInfo =
+                    nc.createSecondaryIndex(primaryIndexInfo, secondaryIndex, storageManager, i);
             IndexDataflowHelperFactory iHelperFactory =
                     new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-            indexDataflowHelpers[i] = iHelperFactory.create(taskCtxs[i].getJobletContext().getServiceContext(), i);
-            indexDataflowHelpers[i].open();
-            primarylsmBtrees[i] = (TestLsmBtree) indexDataflowHelpers[i].getIndexInstance();
-            indexDataflowHelpers[i].close();
+            primaryIndexDataflowHelpers[i] =
+                    iHelperFactory.create(taskCtxs[i].getJobletContext().getServiceContext(), i);
+            primaryIndexDataflowHelpers[i].open();
+            primaryLsmBtrees[i] = (TestLsmBtree) primaryIndexDataflowHelpers[i].getIndexInstance();
+            iHelperFactory =
+                    new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider());
+            secondaryIndexDataflowHelpers[i] =
+                    iHelperFactory.create(taskCtxs[i].getJobletContext().getServiceContext(), i);
+            secondaryIndexDataflowHelpers[i].open();
+            secondaryLsmBtrees[i] = (TestLsmBtree) secondaryIndexDataflowHelpers[i].getIndexInstance();
+            secondaryIndexDataflowHelpers[i].close();
+            primaryIndexDataflowHelpers[i].close();
             insertOps[i] = nc.getInsertPipeline(taskCtxs[i], dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                    KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+                    KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex).getLeft();
+            actors[i] = new Actor("player-" + i, i);
+        }
+        // allow all operations
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            ComponentRollbackTest.allowAllOps(primaryLsmBtrees[i]);
+            ComponentRollbackTest.allowAllOps(secondaryLsmBtrees[i]);
+            actors[i].add(new Request(Request.Action.INSERT_OPEN));
         }
     }
 
     @After
     public void destroyIndex() throws Exception {
-        for (IIndexDataflowHelper indexDataflowHelper : indexDataflowHelpers) {
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            Request close = new Request(Request.Action.INSERT_CLOSE);
+            actors[i].add(close);
+            close.await();
+        }
+        nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        for (IIndexDataflowHelper indexDataflowHelper : secondaryIndexDataflowHelpers) {
+            indexDataflowHelper.destroy();
+        }
+        for (IIndexDataflowHelper indexDataflowHelper : primaryIndexDataflowHelpers) {
             indexDataflowHelper.destroy();
         }
+        for (Actor actor : actors) {
+            actor.stop();
+        }
     }
 
     @Test
     public void testFlushOneFullOneEmpty() {
         try {
-            // allow all operations
-            for (int i = 0; i < NUM_PARTITIONS; i++) {
-                ComponentRollbackTest.allowAllOps(primarylsmBtrees[i]);
+            int totalNumOfComponents = TOTAL_NUM_OF_RECORDS / RECORDS_PER_COMPONENT;
+            for (int j = 0; j < totalNumOfComponents; j++) {
+                actors[0].add(new Request(Request.Action.INSERT_PATCH));
+                actors[0].add(new Request(Request.Action.FLUSH_DATASET));
             }
-
-            insertOps[0].open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
-            VSizeFrame frame = new VSizeFrame(taskCtxs[0]);
-            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
-            int numFlushes = 0;
-            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every RECORDS_PER_COMPONENT records
-                if (j % RECORDS_PER_COMPONENT == (RECORDS_PER_COMPONENT - 1) && j + 1 != TOTAL_NUM_OF_RECORDS) {
-                    if (tupleAppender.getTupleCount() > 0) {
-                        tupleAppender.write(insertOps[0], true);
-                    }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
-                    numFlushes++;
-                }
-                ITupleReference tuple = tupleGenerator.next();
-                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOps[0]);
-            }
-            if (tupleAppender.getTupleCount() > 0) {
-                tupleAppender.write(insertOps[0], true);
-            }
-            insertOps[0].close();
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
-            numFlushes++;
-            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            ensureDone(actors[0]);
             // search now and ensure partition 0 has all the records
             ComponentRollbackTest.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // and that partition 1 has no records
             ComponentRollbackTest.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
             // and that partition 0 has numFlushes disk components
-            Assert.assertEquals(numFlushes, primarylsmBtrees[0].getDiskComponents().size());
+            Assert.assertEquals(totalNumOfComponents, primaryLsmBtrees[0].getDiskComponents().size());
             // and that partition 1 has no disk components
-            Assert.assertEquals(0, primarylsmBtrees[1].getDiskComponents().size());
+            Assert.assertEquals(0, primaryLsmBtrees[1].getDiskComponents().size());
+            // and that in partition 0, all secondary components has a corresponding primary
+            List<ILSMDiskComponent> secondaryDiskComponents = secondaryLsmBtrees[0].getDiskComponents();
+            List<ILSMDiskComponent> primaryDiskComponents = primaryLsmBtrees[0].getDiskComponents();
+            for (int i = 0; i < secondaryDiskComponents.size(); i++) {
+                Assert.assertEquals(secondaryDiskComponents.get(i).getId(), primaryDiskComponents.get(i).getId());
+            }
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
         }
     }
 
+    private void ensureDone(Actor actor) throws InterruptedException {
+        Request req = new Request(Request.Action.DUMMY);
+        actor.add(req);
+        req.await();
+    }
+
+    /**
+     * This test update partition 0, schedule flush and modify partition 1
+     * Then ensure that in partition 1, primary and secondary have different component ids
+     */
+    @Test
+    public void testAllocateWhileFlushIsScheduled() {
+        try {
+            // when the vbc becomes full, we want to know
+            AtomicBoolean isFull = new AtomicBoolean(false);
+            MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
+            primaryLsmBtrees[0].addVirtuablBufferCacheCallback(new IVirtualBufferCacheCallback() {
+                @Override
+                public void isFullChanged(boolean newValue) {
+                    synchronized (isFull) {
+                        isFull.set(newValue);
+                        isFull.notifyAll();
+                    }
+                    synchronized (proceedToScheduleFlush) {
+                        while (!proceedToScheduleFlush.booleanValue()) {
+                            try {
+                                proceedToScheduleFlush.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            });
+            Request insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            while (true) {
+                Thread.sleep(100);
+                if (insertReq.done) {
+                    // if done, then flush was not triggered, then we need to insert a new patch
+                    insertReq = new Request(Request.Action.INSERT_PATCH);
+                    actors[0].add(insertReq);
+                } else if (isFull.get()) {
+                    break;
+                }
+            }
+
+            // now, we need to do two things
+            // allocate primary in partition 1 but not proceed
+            MutableBoolean proceedToAllocateSecondaryIndex = new MutableBoolean(false);
+            MutableBoolean allocated = new MutableBoolean(false);
+            primaryLsmBtrees[1].addAllocateCallback(new ITestOpCallback<Void>() {
+                @Override
+                public void before(Void t) {
+                    // Nothing
+                }
+
+                @Override
+                public void after() {
+                    synchronized (allocated) {
+                        allocated.setValue(true);
+                        allocated.notifyAll();
+                    }
+                    synchronized (proceedToAllocateSecondaryIndex) {
+                        while (!proceedToAllocateSecondaryIndex.booleanValue()) {
+                            try {
+                                proceedToAllocateSecondaryIndex.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            });
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[1].add(insertReq);
+            // Wait for the allocation to happen
+            synchronized (allocated) {
+                while (!allocated.booleanValue()) {
+                    allocated.wait();
+                }
+            }
+            // The memory component has been allocated. now we allow the first actor to proceed to schedule flush
+            MutableBoolean flushStarted = new MutableBoolean(false);
+            primaryLsmBtrees[0].addFlushCallback(new ITestOpCallback<Semaphore>() {
+                @Override
+                public void before(Semaphore t) {
+                    synchronized (flushStarted) {
+                        flushStarted.setValue(true);
+                        flushStarted.notifyAll();
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            });
+            synchronized (proceedToScheduleFlush) {
+                proceedToScheduleFlush.setValue(true);
+                proceedToScheduleFlush.notifyAll();
+            }
+            // Now we need to know that the flush has been scheduled
+            synchronized (flushStarted) {
+                while (!flushStarted.booleanValue()) {
+                    flushStarted.wait();
+                }
+            }
+
+            // we now allow the allocation to proceed
+            synchronized (proceedToAllocateSecondaryIndex) {
+                proceedToAllocateSecondaryIndex.setValue(true);
+                proceedToAllocateSecondaryIndex.notifyAll();
+            }
+            // ensure the insert patch has completed
+            insertReq.await();
+            primaryLsmBtrees[1].clearAllocateCallbacks();
+            // check the Ids of the memory components of partition 1
+            // This shows the bug
+            Assert.assertEquals(primaryLsmBtrees[1].getCurrentMemoryComponent().getId(),
+                    secondaryLsmBtrees[1].getCurrentMemoryComponent().getId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRecycleWhileFlushIsScheduled() {
+        try {
+            Request insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            Request flushReq = new Request(Request.Action.FLUSH_DATASET);
+            actors[0].add(flushReq);
+            flushReq.await();
+            // ensure that index switched to second component
+            Assert.assertEquals(1, primaryLsmBtrees[0].getCurrentMemoryComponentIndex());
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            flushReq = new Request(Request.Action.FLUSH_DATASET);
+            actors[0].add(flushReq);
+            flushReq.await();
+            // ensure we switched back to first component
+            Assert.assertEquals(0, primaryLsmBtrees[0].getCurrentMemoryComponentIndex());
+            // flush first component of partition 1
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[1].add(insertReq);
+            flushReq = new Request(Request.Action.FLUSH_DATASET);
+            actors[1].add(flushReq);
+            flushReq.await();
+            // ensure partition 1 is now on second component
+            Assert.assertEquals(1, primaryLsmBtrees[1].getCurrentMemoryComponentIndex());
+            // now we want to control when schedule flush is executed
+            AtomicBoolean arrivedAtSchduleFlush = new AtomicBoolean(false);
+            AtomicBoolean finishedSchduleFlush = new AtomicBoolean(false);
+            MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
+            addOpTrackerCallback(primaryLsmBtrees[0], new ITestOpCallback<Void>() {
+                @Override
+                public void before(Void t) {
+                    synchronized (arrivedAtSchduleFlush) {
+                        arrivedAtSchduleFlush.set(true);
+                        arrivedAtSchduleFlush.notifyAll();
+                    }
+                    synchronized (proceedToScheduleFlush) {
+                        while (!proceedToScheduleFlush.booleanValue()) {
+                            try {
+                                proceedToScheduleFlush.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+
+                @Override
+                public void after() {
+                    synchronized (finishedSchduleFlush) {
+                        finishedSchduleFlush.set(true);
+                        finishedSchduleFlush.notifyAll();
+                    }
+                }
+            });
+            AtomicBoolean isFull = new AtomicBoolean(false);
+            MutableBoolean proceedAfterIsFullChanged = new MutableBoolean(false);
+            primaryLsmBtrees[1].addVirtuablBufferCacheCallback(new IVirtualBufferCacheCallback() {
+                @Override
+                public void isFullChanged(boolean newValue) {
+                    synchronized (isFull) {
+                        isFull.set(newValue);
+                        isFull.notifyAll();
+                    }
+                    synchronized (proceedAfterIsFullChanged) {
+                        while (!proceedAfterIsFullChanged.booleanValue()) {
+                            try {
+                                proceedAfterIsFullChanged.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            });
+
+            // now we start adding records to partition 1 until flush is triggerred
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[1].add(insertReq);
+            while (true) {
+                Thread.sleep(100);
+                if (insertReq.done) {
+                    // if done, then flush was not triggered, then we need to insert a new patch
+                    insertReq = new Request(Request.Action.INSERT_PATCH);
+                    actors[1].add(insertReq);
+                } else if (isFull.get()) {
+                    break;
+                }
+            }
+            // Now we know that vbc is full and flush will be scheduled, we allow this to proceed
+            synchronized (proceedAfterIsFullChanged) {
+                proceedAfterIsFullChanged.setValue(true);
+                proceedAfterIsFullChanged.notifyAll();
+            }
+
+            // now we want to control the recycling of components in partition 0
+            MutableBoolean recycledPrimary = new MutableBoolean(false);
+            MutableBoolean proceedAfterRecyclePrimary = new MutableBoolean(false);
+            ITestOpCallback<ILSMMemoryComponent> primaryRecycleCallback = new ITestOpCallback<ILSMMemoryComponent>() {
+                @Override
+                public void before(ILSMMemoryComponent t) {
+                }
+
+                @Override
+                public void after() {
+                    synchronized (recycledPrimary) {
+                        recycledPrimary.setValue(true);
+                        recycledPrimary.notifyAll();
+                    }
+                    synchronized (proceedAfterRecyclePrimary) {
+                        while (!proceedAfterRecyclePrimary.booleanValue()) {
+                            try {
+                                proceedAfterRecyclePrimary.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+            };
+            primaryLsmBtrees[0].addIoRecycleCallback(primaryRecycleCallback);
+
+            MutableBoolean arrivedToRecycleSecondary = new MutableBoolean(false);
+            MutableBoolean proceedToRecycleSecondary = new MutableBoolean(false);
+            ITestOpCallback<ILSMMemoryComponent> secondaryRecycleCallback = new ITestOpCallback<ILSMMemoryComponent>() {
+                @Override
+                public void before(ILSMMemoryComponent t) {
+                    synchronized (arrivedToRecycleSecondary) {
+                        arrivedToRecycleSecondary.setValue(true);
+                        arrivedToRecycleSecondary.notifyAll();
+                    }
+                    synchronized (proceedToRecycleSecondary) {
+                        while (!proceedToRecycleSecondary.booleanValue()) {
+                            try {
+                                proceedToRecycleSecondary.wait();
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+                        }
+                    }
+                }
+
+                @Override
+                public void after() {
+                }
+            };
+            secondaryLsmBtrees[0].addIoRecycleCallback(secondaryRecycleCallback);
+            // we first ensure that schedule flush arrived
+            synchronized (arrivedAtSchduleFlush) {
+                while (!arrivedAtSchduleFlush.get()) {
+                    arrivedAtSchduleFlush.wait();
+                }
+            }
+
+            // we insert a single frame into partition 0
+            insertReq = new Request(Request.Action.INSERT_PATCH);
+            actors[0].add(insertReq);
+            // wait until component has been recycled
+            synchronized (recycledPrimary) {
+                while (!recycledPrimary.booleanValue()) {
+                    recycledPrimary.wait();
+                }
+            }
+            synchronized (proceedAfterRecyclePrimary) {
+                proceedAfterRecyclePrimary.setValue(true);
+                proceedAfterRecyclePrimary.notifyAll();
+            }
+            // now, we know that the primary has been recycled. we allow it to proceed
+            // we allow the scheduleFlush to proceed
+            synchronized (proceedToScheduleFlush) {
+                proceedToScheduleFlush.setValue(true);
+                proceedToScheduleFlush.notifyAll();
+            }
+            // wait until scheduleFlushCompletes
+            synchronized (finishedSchduleFlush) {
+                while (!finishedSchduleFlush.get()) {
+                    finishedSchduleFlush.wait();
+                }
+            }
+            // allow recycling of secondary
+            synchronized (proceedToRecycleSecondary) {
+                proceedToRecycleSecondary.setValue(true);
+                proceedToRecycleSecondary.notifyAll();
+            }
+            // ensure that the insert completes
+            insertReq.await();
+            // ensure the two memory components at partition 0 have different component ids
+            Assert.assertEquals(primaryLsmBtrees[0].getCurrentMemoryComponent().getId(),
+                    secondaryLsmBtrees[0].getCurrentMemoryComponent().getId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private static class Request {
+        enum Action {
+            DUMMY,
+            INSERT_OPEN,
+            INSERT_PATCH,
+            FLUSH_DATASET,
+            INSERT_CLOSE
+        }
+
+        private final Action action;
+        private volatile boolean done;
+
+        public Request(Action action) {
+            this.action = action;
+            done = false;
+        }
+
+        synchronized void complete() {
+            done = true;
+            notifyAll();
+        }
+
+        synchronized void await() throws InterruptedException {
+            while (!done) {
+                wait();
+            }
+        }
+
+    }
+
+    public class Actor extends SingleThreadEventProcessor<Request> {
+        private final int partition;
+        private final TupleGenerator tupleGenerator;
+        private final VSizeFrame frame;
+        private final FrameTupleAppender tupleAppender;
+
+        public Actor(String name, int partition) throws HyracksDataException {
+            super(name);
+            this.partition = partition;
+            tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            frame = new VSizeFrame(taskCtxs[partition]);
+            tupleAppender = new FrameTupleAppender(frame);
+        }
+
+        @Override
+        protected void handle(Request req) throws Exception {
+            try {
+                switch (req.action) {
+                    case FLUSH_DATASET:
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOps[partition], true);
+                        }
+                        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                        break;
+                    case INSERT_CLOSE:
+                        insertOps[partition].close();
+                        break;
+                    case INSERT_OPEN:
+                        insertOps[partition].open();
+                        break;
+                    case INSERT_PATCH:
+                        for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+                            ITupleReference tuple = tupleGenerator.next();
+                            DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOps[partition]);
+                        }
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOps[partition], true);
+                        }
+                        ComponentRollbackTest.waitForOperations(primaryLsmBtrees[partition]);
+                        break;
+                    default:
+                        break;
+                }
+            } finally {
+                req.complete();
+            }
+        }
+    }
+
+    private static void addOpTrackerCallback(TestLsmBtree lsmBtree, ITestOpCallback<Void> callback) {
+        if (!lsmBtree.isPrimaryIndex()) {
+            throw new IllegalArgumentException("Can only add callbacks to primary opTracker");
+        }
+        TestPrimaryIndexOperationTracker opTracker = (TestPrimaryIndexOperationTracker) lsmBtree.getOperationTracker();
+        opTracker.addCallback(callback);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index b65ba03..652616a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -121,10 +121,11 @@ public class SearchCursorComponentSwitchTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        partitioningKeys, null, null, null, false, null),
-                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        dataset =
+                new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                        NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                                partitioningKeys, null, null, null, false, null),
+                        null, DatasetType.INTERNAL, DATASET_ID, 0);
         PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                 storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
         IndexDataflowHelperFactory iHelperFactory =
@@ -138,7 +139,7 @@ public class SearchCursorComponentSwitchTest {
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager).getLeft();
+                KEY_INDICATORS_LIST, storageManager, null).getLeft();
     }
 
     @After
@@ -147,7 +148,7 @@ public class SearchCursorComponentSwitchTest {
     }
 
     void unblockSearch(TestLsmBtree lsmBtree) {
-        lsmBtree.addSearchCallback(sem -> sem.release());
+        lsmBtree.addSearchCallback(ComponentRollbackTest.ALLOW_CALLBACK);
         lsmBtree.allowSearch(1);
     }
 
@@ -170,7 +171,7 @@ public class SearchCursorComponentSwitchTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -183,7 +184,7 @@ public class SearchCursorComponentSwitchTest {
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // unblock the search
             unblockSearch(lsmBtree);
@@ -216,7 +217,7 @@ public class SearchCursorComponentSwitchTest {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
                     }
-                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
                 }
                 ITupleReference tuple = tupleGenerator.next();
                 DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -229,7 +230,7 @@ public class SearchCursorComponentSwitchTest {
             firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
-            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+            ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // merge all components
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
index 7268296..8d37878 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBTreeResourceFactoryProvider.java
@@ -33,6 +33,7 @@ import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
@@ -68,6 +69,9 @@ public class TestLsmBTreeResourceFactoryProvider implements IResourceFactoryProv
         int[] bloomFilterFields = getBloomFilterFields(dataset, index);
         double bloomFilterFalsePositiveRate = mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate();
         ILSMOperationTrackerFactory opTrackerFactory = dataset.getIndexOperationTrackerFactory(index);
+        if (opTrackerFactory instanceof PrimaryIndexOperationTrackerFactory) {
+            opTrackerFactory = new TestPrimaryIndexOperationTrackerFactory(dataset.getDatasetId());
+        }
         ILSMIOOperationCallbackFactory ioOpCallbackFactory = dataset.getIoOperationCallbackFactory(index);
         IStorageManager storageManager = storageComponentProvider.getStorageManager();
         IMetadataPageManagerFactory metadataPageManagerFactory =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index ddcb5b5..fea6cd8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -18,14 +18,20 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import java.util.List;
+
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
 
 public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallbackFactory {
@@ -92,12 +98,31 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback
     }
 
     public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
+        private final TestLsmBtree lsmBtree;
+
         public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
             super(index, idGenerator);
+            lsmBtree = (TestLsmBtree) index;
+        }
+
+        @Override
+        public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
+            lsmBtree.beforeIoOperationCalled();
+            super.beforeOperation(opType);
+            lsmBtree.beforeIoOperationReturned();
+        }
+
+        @Override
+        public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
+                ILSMDiskComponent newComponent) throws HyracksDataException {
+            lsmBtree.afterIoOperationCalled();
+            super.afterOperation(opType, oldComponents, newComponent);
+            lsmBtree.afterIoOperationReturned();
         }
 
         @Override
         public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+            lsmBtree.afterIoFinalizeCalled();
             super.afterFinalize(opType, newComponent);
             synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
                 if (newComponent != null) {
@@ -119,6 +144,21 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback
                 }
                 TestLsmBtreeIoOpCallbackFactory.this.notifyAll();
             }
+            lsmBtree.afterIoFinalizeReturned();
+        }
+
+        @Override
+        public void recycled(ILSMMemoryComponent component, boolean advance) throws HyracksDataException {
+            lsmBtree.recycledCalled(component);
+            super.recycled(component, advance);
+            lsmBtree.recycledReturned(component);
+        }
+
+        @Override
+        public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
+            lsmBtree.allocatedCalled(component);
+            super.allocated(component);
+            lsmBtree.allocatedReturned(component);
         }
 
         private void recordFailure(LSMIOOperationType opType) {