You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/10/04 05:53:13 UTC

[4/4] asterixdb git commit: [NO ISSUE][STO] Component Deletes Through flushes and merges

[NO ISSUE][STO] Component Deletes Through flushes and merges

- user model changes: no
- storage format changes: no
- interface changes: yes
  - moved validation of component from the index:
    - ILSMIndex and all of its implementations
    to the component:
    - ILSMDiskComponent and all of its implementations

details:
- This change enables component level deletes.

Change-Id: I178656207bfa1d15e6ae5ff2403a16df33940773
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2017
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b2e50b7d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b2e50b7d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b2e50b7d

Branch: refs/heads/master
Commit: b2e50b7d1dc233f7c34af24083f3f3fded7f9d05
Parents: 9f04f97
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Tue Oct 3 10:10:54 2017 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Tue Oct 3 22:52:35 2017 -0700

----------------------------------------------------------------------
 asterixdb/asterix-app/pom.xml                   |  17 +-
 .../apache/asterix/app/nc/RecoveryManager.java  |   2 +-
 .../app/bootstrap/TestNodeController.java       | 120 +--
 .../test/dataflow/ComponentRollbackTest.java    | 889 +++++++++++++++++++
 .../asterix/test/dataflow/LogMarkerTest.java    |  19 +-
 .../asterix/test/dataflow/TestDataset.java      |  67 ++
 .../TestLsmBTreeResourceFactoryProvider.java    | 171 ++++
 .../TestLsmBtreeIoOpCallbackFactory.java        | 125 +++
 .../context/CorrelatedPrefixMergePolicy.java    |   4 +-
 .../AbstractLSMIOOperationCallback.java         |  15 +-
 .../PrimaryIndexLogMarkerCallback.java          |  30 +-
 .../CorrelatedPrefixMergePolicyTest.java        |   2 +-
 .../LSMSecondaryIndexBulkLoadNodePushable.java  |   4 +-
 asterixdb/pom.xml                               |   6 +
 .../hyracks/api/exceptions/ErrorCode.java       |   4 +
 .../src/main/resources/errormsg/en.properties   |   4 +
 .../am/common/ophelpers/IndexOperation.java     |   4 +-
 .../am/lsm/btree/impls/ExternalBTree.java       |  13 +-
 .../lsm/btree/impls/ExternalBTreeWithBuddy.java |  31 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java    |  42 +-
 .../lsm/btree/impls/LSMBTreeDiskComponent.java  |  11 +
 .../lsm/btree/impls/LSMBTreeFlushOperation.java |   8 +-
 .../lsm/btree/impls/LSMBTreeMergeOperation.java |  10 +-
 .../impls/LSMBTreeWithBuddyDiskComponent.java   |   8 +
 .../impls/LSMBTreeWithBuddyMergeOperation.java  |  12 +-
 .../am/lsm/common/api/ILSMDiskComponent.java    |  11 +
 .../storage/am/lsm/common/api/ILSMHarness.java  |  11 +
 .../am/lsm/common/api/ILSMIOOperation.java      |   9 +-
 .../storage/am/lsm/common/api/ILSMIndex.java    |  11 +-
 .../am/lsm/common/api/ILSMIndexAccessor.java    |  15 +
 .../lsm/common/impls/AbstractIoOperation.java   |   1 +
 .../common/impls/AbstractLSMDiskComponent.java  |   6 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java   |  84 +-
 .../lsm/common/impls/ConstantMergePolicy.java   |   4 +-
 .../am/lsm/common/impls/EmptyComponent.java     |  85 ++
 .../impls/EmptyDiskComponentMetadata.java       |  51 ++
 .../lsm/common/impls/ExternalIndexHarness.java  |   4 +-
 .../am/lsm/common/impls/FlushOperation.java     |  10 +-
 .../am/lsm/common/impls/LSMComponentState.java  |  27 -
 .../storage/am/lsm/common/impls/LSMHarness.java | 110 ++-
 .../lsm/common/impls/LSMTreeIndexAccessor.java  |  13 +-
 .../am/lsm/common/impls/MergeOperation.java     |   7 +-
 .../am/lsm/common/impls/PrefixMergePolicy.java  |   6 +-
 .../am/lsm/common/impls/TracedIOOperation.java  |   8 +-
 .../am/lsm/common/util/ComponentUtils.java      | 178 ++++
 .../am/lsm/common/util/IOOperationUtils.java    |  43 +
 .../lsm/common/utils/ComponentMetadataUtil.java | 155 ----
 .../invertedindex/impls/LSMInvertedIndex.java   |  45 +-
 .../impls/LSMInvertedIndexAccessor.java         |  12 +
 .../impls/LSMInvertedIndexDiskComponent.java    |  15 +
 .../impls/LSMInvertedIndexFlushOperation.java   |   7 +-
 .../impls/LSMInvertedIndexMergeOperation.java   |  11 +-
 .../am/lsm/rtree/impls/AbstractLSMRTree.java    |   5 -
 .../am/lsm/rtree/impls/ExternalRTree.java       |  16 +-
 .../storage/am/lsm/rtree/impls/LSMRTree.java    |  32 +-
 .../lsm/rtree/impls/LSMRTreeDiskComponent.java  |  12 +
 .../lsm/rtree/impls/LSMRTreeFlushOperation.java |   8 +-
 .../lsm/rtree/impls/LSMRTreeMergeOperation.java |  11 +-
 .../impls/LSMRTreeWithAntiMatterTuples.java     |  26 +-
 .../hyracks-storage-am-lsm-btree-test/pom.xml   |  21 +
 .../am/lsm/btree/LSMBTreeFileManagerTest.java   |   6 +-
 .../btree/LSMBTreeFilterMergeTestDriver.java    |   6 +-
 .../am/lsm/btree/LSMBTreeMergeTestDriver.java   |   2 +-
 .../btree/LSMBTreeScanDiskComponentsTest.java   |   2 +-
 ...TreeUpdateInPlaceScanDiskComponentsTest.java |   4 +-
 .../am/lsm/btree/impl/ITestOpCallback.java      |  25 +
 .../storage/am/lsm/btree/impl/TestLsmBtree.java | 260 ++++++
 .../btree/impl/TestLsmBtreeLocalResource.java   |  70 ++
 .../impl/TestLsmBtreeLocalResourceFactory.java  |  61 ++
 .../btree/impl/TestLsmBtreeSearchCursor.java    |  51 ++
 .../am/lsm/btree/impl/TestLsmBtreeUtil.java     | 105 +++
 .../btree/multithread/LSMBTreeTestWorker.java   |   2 +-
 .../lsm/common/test/PrefixMergePolicyTest.java  |   2 +-
 .../LSMInvertedIndexMergeTest.java              |   2 +-
 .../PartitionedLSMInvertedIndexMergeTest.java   |   2 +-
 .../multithread/LSMInvertedIndexTestWorker.java |   2 +-
 .../am/lsm/rtree/LSMRTreeMergeTestDriver.java   |   2 +-
 .../rtree/multithread/LSMRTreeTestWorker.java   |   2 +-
 .../LSMRTreeWithAntiMatterTuplesTestWorker.java |   2 +-
 79 files changed, 2690 insertions(+), 606 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 489e5de..717b950 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -16,7 +16,8 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>apache-asterixdb</artifactId>
@@ -169,7 +170,7 @@
             </goals>
             <configuration>
               <licenses combine.children="append">
-                <license implementation="org.apache.rat.analysis.license.MITLicense"/>
+                <license implementation="org.apache.rat.analysis.license.MITLicense" />
               </licenses>
               <excludes combine.children="append">
                 <exclude>src/test/resources/**/results_parser_sqlpp/**</exclude>
@@ -192,7 +193,7 @@
             <configuration>
               <reportFile>${project.build.directory}/webqueryui-rat.txt</reportFile>
               <licenses combine.children="append">
-                <license implementation="org.apache.rat.analysis.license.MITLicense"/>
+                <license implementation="org.apache.rat.analysis.license.MITLicense" />
                 <license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
                   <licenseFamilyCategory>MIT</licenseFamilyCategory>
                   <licenseFamilyName>JQuery</licenseFamilyName>
@@ -210,7 +211,7 @@
                 </license>
               </licenses>
               <licenseFamilies combine.children="append">
-                <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily"/>
+                <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily" />
                 <licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
                   <familyName>JQuery</familyName>
                 </licenseFamily>
@@ -567,5 +568,11 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-am-lsm-btree-test</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 10af9ff..04e6313 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -362,7 +362,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                     try {
                                         maxDiskLastLsn =
                                                 ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
-                                                        .getComponentLSN(lsmIndex.getImmutableComponents());
+                                                        .getComponentLSN(lsmIndex.getDiskComponents());
                                     } catch (HyracksDataException e) {
                                         datasetLifecycleManager.close(localResource.getPath());
                                         throw e;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 53a4f23..104f80b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -39,14 +39,12 @@ import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
@@ -68,7 +66,6 @@ import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -83,7 +80,6 @@ import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
-import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -93,11 +89,10 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.util.file.FileUtil;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 public class TestNodeController {
     protected static final Logger LOGGER = Logger.getLogger(TestNodeController.class.getName());
@@ -144,12 +139,7 @@ public class TestNodeController {
         }
         jobletCtx = Mockito.mock(IHyracksJobletContext.class);
         Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
-        Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
-            @Override
-            public JobId answer(InvocationOnMock invocation) throws Throwable {
-                return jobId;
-            }
-        });
+        Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
     }
 
     public void deInit() throws Exception {
@@ -167,8 +157,7 @@ public class TestNodeController {
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
             StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
-                storageComponentProvider);
+                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
         IndexOperation op = IndexOperation.INSERT;
         IModificationOperationCallbackFactory modOpCallbackFactory =
                 new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
@@ -176,7 +165,7 @@ public class TestNodeController {
                         ResourceType.LSM_BTREE);
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
-                storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+                storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
         LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(ctx, PARTITION,
                 primaryIndexInfo.primaryIndexInsertFieldsPermutations,
                 recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op,
@@ -196,10 +185,9 @@ public class TestNodeController {
         IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
         JobSpecification spec = new JobSpecification();
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
-                storageComponentProvider);
+                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
         IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
-                storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+                storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
         BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
                 null, null, true, true, indexDataflowHelperFactory, false, false, null,
                 NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
@@ -236,13 +224,12 @@ public class TestNodeController {
         }
     }
 
-    public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
+    public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
             ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
             int[] filterFields, IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
             List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
-                storageComponentProvider);
+                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
         Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(),
                 MetadataUtil.PENDING_NO_OP);
         MetadataProvider mdProvider = new MetadataProvider(
@@ -252,34 +239,17 @@ public class TestNodeController {
                     recordType, metaType, mergePolicyFactory, mergePolicyProperties);
             IndexBuilderFactory indexBuilderFactory =
                     new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
-                            primaryIndexInfo.fileSplitProvider, resourceFactory, !dataset.isTemp());
+                            primaryIndexInfo.getFileSplitProvider(), resourceFactory, !dataset.isTemp());
             IHyracksTaskContext ctx = createTestContext(false);
             IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, 0);
             indexBuilder.build();
         } finally {
             mdProvider.getLocks().unlock();
         }
+        return primaryIndexInfo;
     }
 
-    private int[] createPrimaryIndexBloomFilterFields(int length) {
-        int[] primaryIndexBloomFilterKeyFields = new int[length];
-        for (int j = 0; j < length; ++j) {
-            primaryIndexBloomFilterKeyFields[j] = j;
-        }
-        return primaryIndexBloomFilterKeyFields;
-    }
-
-    private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
-        IBinaryComparatorFactory[] primaryIndexComparatorFactories =
-                new IBinaryComparatorFactory[primaryKeyTypes.length];
-        for (int j = 0; j < primaryKeyTypes.length; ++j) {
-            primaryIndexComparatorFactories[j] =
-                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(primaryKeyTypes[j], true);
-        }
-        return primaryIndexComparatorFactories;
-    }
-
-    private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
+    public static ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
             IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) {
         int i = 0;
         ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
@@ -294,7 +264,7 @@ public class TestNodeController {
         return primaryIndexSerdes;
     }
 
-    private ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
+    public static ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
             ARecordType recordType, ARecordType metaType) {
         ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields];
         int i = 0;
@@ -336,55 +306,34 @@ public class TestNodeController {
         return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager();
     }
 
-    @SuppressWarnings("unused")
-    private class PrimaryIndexInfo {
-        private Dataset dataset;
+    public static class PrimaryIndexInfo {
         private IAType[] primaryKeyTypes;
         private ARecordType recordType;
         private ARecordType metaType;
         private ILSMMergePolicyFactory mergePolicyFactory;
         private Map<String, String> mergePolicyProperties;
-        private int[] filterFields;
         private int primaryIndexNumOfTupleFields;
-        private IBinaryComparatorFactory[] primaryIndexComparatorFactories;
         private ITypeTraits[] primaryIndexTypeTraits;
         private ISerializerDeserializer<?>[] primaryIndexSerdes;
-        private int[] primaryIndexBloomFilterKeyFields;
-        private ITypeTraits[] filterTypeTraits;
-        private IBinaryComparatorFactory[] filterCmpFactories;
-        private int[] btreeFields;
         private ConstantFileSplitProvider fileSplitProvider;
         private RecordDescriptor rDesc;
         private int[] primaryIndexInsertFieldsPermutations;
         private int[] primaryKeyIndexes;
-        private List<List<String>> keyFieldNames;
-        private List<Integer> keyFieldSourceIndicators;
-        private List<IAType> keyFieldTypes;
         private Index index;
-        private IStorageComponentProvider storageComponentProvider;
 
         public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
                 ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
-                int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-                IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
-            this.storageComponentProvider = storageComponentProvider;
-            this.dataset = dataset;
+                int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+                throws AlgebricksException {
             this.primaryKeyTypes = primaryKeyTypes;
             this.recordType = recordType;
             this.metaType = metaType;
             this.mergePolicyFactory = mergePolicyFactory;
             this.mergePolicyProperties = mergePolicyProperties;
-            this.filterFields = filterFields;
             this.primaryKeyIndexes = primaryKeyIndexes;
             primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
             primaryIndexTypeTraits =
                     createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
-            primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes);
-            primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
-            filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recordType);
-            filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(dataset, recordType,
-                    NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
-            btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
             primaryIndexSerdes =
                     createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
             rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
@@ -392,23 +341,22 @@ public class TestNodeController {
             for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
                 primaryIndexInsertFieldsPermutations[i] = i;
             }
-            keyFieldSourceIndicators = primaryKeyIndicators;
-            keyFieldNames = new ArrayList<>();
-            keyFieldTypes = Arrays.asList(primaryKeyTypes);
-            for (int i = 0; i < keyFieldSourceIndicators.size(); i++) {
-                Integer indicator = keyFieldSourceIndicators.get(i);
+            List<List<String>> keyFieldNames = new ArrayList<>();
+            List<IAType> keyFieldTypes = Arrays.asList(primaryKeyTypes);
+            for (int i = 0; i < primaryKeyIndicators.size(); i++) {
+                Integer indicator = primaryKeyIndicators.get(i);
                 String[] fieldNames =
                         indicator == Index.RECORD_INDICATOR ? recordType.getFieldNames() : metaType.getFieldNames();
                 keyFieldNames.add(Arrays.asList(fieldNames[primaryKeyIndexes[i]]));
             }
             index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
-                    IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, false, true,
+                    IndexType.BTREE, keyFieldNames, primaryKeyIndicators, keyFieldTypes, false, false, true,
                     MetadataUtil.PENDING_NO_OP);
             List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
-            FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(
-                    ((ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext())
-                            .getClusterStateManager(),
-                    dataset, index.getIndexName(), nodes);
+            CcApplicationContext appCtx =
+                    (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+            FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset,
+                    index.getIndexName(), nodes);
             fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
         }
 
@@ -436,6 +384,10 @@ public class TestNodeController {
                     .thenReturn(searcgRecDesc);
             return rDescProvider;
         }
+
+        public ConstantFileSplitProvider getFileSplitProvider() {
+            return fileSplitProvider;
+        }
     }
 
     public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) {
@@ -450,18 +402,12 @@ public class TestNodeController {
     public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo,
             IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
         return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
-                primaryIndexInfo.fileSplitProvider);
+                primaryIndexInfo.getFileSplitProvider());
     }
 
-    public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
-            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
-            Map<String, String> mergePolicyProperties, int[] filterFields,
-            IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
-            List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
-        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
-                storageComponentProvider);
-        return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider)
-                .create(createTestContext(true).getJobletContext().getServiceContext(), PARTITION);
+    public IStorageManager getStorageManager() {
+        CcApplicationContext appCtx =
+                (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+        return appCtx.getStorageManager();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
new file mode 100644
index 0000000..76bec8c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -0,0 +1,889 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Predicate;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ComponentRollbackTest {
+
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final int TOTAL_NUM_OF_RECORDS = 10000;
+    private static final int RECORDS_PER_COMPONENT = 1000;
+    private static final int DATASET_ID = 101;
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final Predicate<ILSMComponent> memoryComponentsPredicate = c -> c instanceof ILSMMemoryComponent;
+    private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+    private static TestNodeController nc;
+    private static TestLsmBtree lsmBtree;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+    private static Dataset dataset;
+    private static IHyracksTaskContext ctx;
+    private static IIndexDataflowHelper indexDataflowHelper;
+    private static ITransactionContext txnCtx;
+    private static LSMInsertDeleteOperatorNodePushable insertOp;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        nc = new TestNodeController(null, false);
+        nc.init();
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.out.println("TearDown");
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Before
+    public void createIndex() throws Exception {
+        List<List<String>> partitioningKeys = new ArrayList<>();
+        partitioningKeys.add(Collections.singletonList("key"));
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+                NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, false),
+                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+                new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        ctx = nc.createTestContext(false);
+        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+        indexDataflowHelper.open();
+        lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+        indexDataflowHelper.close();
+        nc.newJobId();
+        txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+                null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+    }
+
+    @After
+    public void destroyIndex() throws Exception {
+        indexDataflowHelper.destroy();
+    }
+
+    private void allowAllOps(TestLsmBtree lsmBtree) {
+        lsmBtree.addModifyCallback(sem -> sem.release());
+        lsmBtree.addFlushCallback(sem -> sem.release());
+        lsmBtree.addSearchCallback(sem -> sem.release());
+        lsmBtree.addMergeCallback(sem -> sem.release());
+    }
+
+    @Test
+    public void testRollbackWhileNoOp() {
+        try {
+            // allow all operations
+            allowAllOps(lsmBtree);
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            ILSMIndexAccessor lsmAccessor =
+                    lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            // rollback a memory component
+            lsmAccessor.deleteComponents(memoryComponentsPredicate);
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            // rollback the last disk component
+            lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+            DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+            lsmAccessor.deleteComponents(pred);
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackThenInsert() {
+        try {
+            // allow all operations
+            allowAllOps(lsmBtree);
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            ILSMIndexAccessor lsmAccessor =
+                    lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            // rollback a memory component
+            lsmAccessor.deleteComponents(memoryComponentsPredicate);
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+
+            // insert again
+            nc.newJobId();
+            txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+            insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
+                    null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+            insertOp.open();
+            for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // rollback the last disk component
+            lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+            DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+            lsmAccessor.deleteComponents(pred);
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackWhileSearch() {
+        try {
+            // allow all operations but search
+            allowAllOps(lsmBtree);
+            lsmBtree.clearSearchCallbacks();
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            // wait till firstSearcher enter the components
+            firstSearcher.waitUntilEntered();
+            // now that we enetered, we will rollback
+            ILSMIndexAccessor lsmAccessor =
+                    lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            // rollback a memory component
+            lsmAccessor.deleteComponents(
+                    c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
+            // now that the rollback has completed, we will unblock the search
+            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.allowSearch(1);
+            Assert.assertTrue(firstSearcher.result());
+            // search now and ensure
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            // rollback the last disk component
+            // re-block searches
+            lsmBtree.clearSearchCallbacks();
+            Searcher secondSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree,
+                    TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            // wait till firstSearcher enter the components
+            secondSearcher.waitUntilEntered();
+            lsmAccessor = lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+            DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+            lsmAccessor.deleteComponents(pred);
+            // now that the rollback has completed, we will unblock the search
+            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.allowSearch(1);
+            Assert.assertTrue(secondSearcher.result());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackWhileFlush() {
+        try {
+            // allow all operations
+            allowAllOps(lsmBtree);
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // disable flushes
+            lsmBtree.clearFlushCallbacks();
+            Flusher firstFlusher = new Flusher(lsmBtree);
+            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            firstFlusher.waitUntilCount(1);
+            // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
+            Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+            // now that the rollback has completed, we will search
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            //unblock the flush
+            lsmBtree.allowFlush(1);
+            // ensure rollback completed
+            rollerback.complete();
+            // ensure current mem component is not modified
+            Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            // search now and ensure
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackWhileMerge() {
+        try {
+            // allow all operations but merge
+            allowAllOps(lsmBtree);
+            lsmBtree.clearMergeCallbacks();
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // Now, we will start a full merge
+            Merger merger = new Merger(lsmBtree);
+            ILSMIndexAccessor mergeAccessor =
+                    lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            // select the components to merge... the last three
+            int numMergedComponents = 3;
+            List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+            long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+            for (int i = 0; i < numMergedComponents; i++) {
+                mergedComponents.add(diskComponents.get(i));
+            }
+            mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+            merger.waitUntilCount(1);
+            // now that we enetered, we will rollback
+            Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+            // rollback is now waiting for the merge to complete
+            // we will search
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            //unblock the merge
+            lsmBtree.allowMerge(1);
+            // ensure rollback completes
+            rollerback.complete();
+            // ensure current mem component is not modified
+            Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            // search now and ensure that we rolled back the merged component
+            searchAndAssertCount(nc, ctx, dataset, storageManager,
+                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackWhileFlushAndSearchFlushExistsFirst() {
+        try {
+            // allow all operations
+            allowAllOps(lsmBtree);
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // disable flushes
+            // disable searches
+            lsmBtree.clearFlushCallbacks();
+            lsmBtree.clearSearchCallbacks();
+            Flusher firstFlusher = new Flusher(lsmBtree);
+            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            firstFlusher.waitUntilCount(1);
+            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            // wait till firstSearcher enter the components
+            firstSearcher.waitUntilEntered();
+            // now that we enetered, we will rollback rollback a memory component
+            Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+            //unblock the flush
+            lsmBtree.allowFlush(1);
+            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.allowSearch(1);
+            Assert.assertTrue(firstSearcher.result());
+            // ensure current mem component is not modified
+            rollerback.complete();
+            Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            // search now and ensure the rollback was no op since it waits for ongoing flushes
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackWhileFlushAndSearchSearchExistsFirst() {
+        try {
+            // allow all operations
+            allowAllOps(lsmBtree);
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // disable flushes
+            // disable searches
+            lsmBtree.clearFlushCallbacks();
+            Flusher firstFlusher = new Flusher(lsmBtree);
+            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
+            firstFlusher.waitUntilCount(1);
+            lsmBtree.clearSearchCallbacks();
+            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            // wait till firstSearcher enter the components
+            firstSearcher.waitUntilEntered();
+            // now that we enetered, we will rollback
+            Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
+            // The rollback will be waiting for the flush to complete
+            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.allowSearch(1);
+            Assert.assertTrue(firstSearcher.result());
+            //unblock the flush
+            lsmBtree.allowFlush(1);
+            // ensure current mem component is not modified
+            rollerback.complete();
+            Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            // search now and ensure
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackWhileMergeAndSearchMergeExitsFirst() {
+        try {
+            // allow all operations except merge
+            allowAllOps(lsmBtree);
+            lsmBtree.clearMergeCallbacks();
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // Now, we will start a merge
+            Merger merger = new Merger(lsmBtree);
+            ILSMIndexAccessor mergeAccessor =
+                    lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            // select the components to merge... the last three
+            int numMergedComponents = 3;
+            List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+            long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+            for (int i = 0; i < numMergedComponents; i++) {
+                mergedComponents.add(diskComponents.get(i));
+            }
+            mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+            merger.waitUntilCount(1);
+            // we will block search
+            lsmBtree.clearSearchCallbacks();
+            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            // wait till firstSearcher enter the components
+            firstSearcher.waitUntilEntered();
+            // now that we enetered, we will rollback
+            Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+            // the rollback is waiting for all flushes and merges to complete before it proceeds
+            // unblock the merge
+            lsmBtree.allowMerge(1);
+            // unblock the search
+            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.allowSearch(1);
+            Assert.assertTrue(firstSearcher.result());
+            rollerback.complete();
+            // now that the rollback has completed, we will search
+            searchAndAssertCount(nc, ctx, dataset, storageManager,
+                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            // ensure current mem component is not modified
+            Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRollbackWhileMergeAndSearchSearchExitsFirst() {
+        try {
+            // allow all operations except merge
+            allowAllOps(lsmBtree);
+            lsmBtree.clearMergeCallbacks();
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every 1000 records
+                if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
+            // get all components
+            List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
+            List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
+            Assert.assertEquals(9, diskComponents.size());
+            Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // Now, we will start a merge
+            Merger merger = new Merger(lsmBtree);
+            ILSMIndexAccessor mergeAccessor =
+                    lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            // select the components to merge... the last three
+            List<ILSMDiskComponent> mergedComponents = new ArrayList<>();
+            long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+            int numMergedComponents = 3;
+            for (int i = 0; i < numMergedComponents; i++) {
+                mergedComponents.add(diskComponents.get(i));
+            }
+            mergeAccessor.scheduleMerge(lsmBtree.getIOOperationCallback(), mergedComponents);
+            merger.waitUntilCount(1);
+            // we will block search
+            lsmBtree.clearSearchCallbacks();
+            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            // wait till firstSearcher enter the components
+            firstSearcher.waitUntilEntered();
+            // now that we enetered, we will rollback
+            Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
+            // unblock the search
+            lsmBtree.addSearchCallback(sem -> sem.release());
+            lsmBtree.allowSearch(1);
+            Assert.assertTrue(firstSearcher.result());
+            // even though rollback has been called, it is still waiting for the merge to complete
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            //unblock the merge
+            lsmBtree.allowMerge(1);
+            rollerBack.complete();
+            searchAndAssertCount(nc, ctx, dataset, storageManager,
+                    TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+            // ensure current mem component is not modified
+            Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private class Rollerback {
+        private Thread task;
+        private Exception failure;
+
+        public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) {
+            // now that we enetered, we will rollback
+            Runnable runnable = new Runnable() {
+                @Override
+                public void run() {
+                    ILSMIndexAccessor lsmAccessor =
+                            lsmBtree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+                    try {
+                        lsmAccessor.deleteComponents(predicate);
+                    } catch (HyracksDataException e) {
+                        failure = e;
+                    }
+                }
+            };
+            task = new Thread(runnable);
+            task.start();
+        }
+
+        void complete() throws Exception {
+            task.join();
+            if (failure != null) {
+                throw failure;
+            }
+        }
+    }
+
+    private class Searcher {
+        private ExecutorService executor = Executors.newSingleThreadExecutor();
+        private Future<Boolean> task;
+        private volatile boolean entered = false;
+
+        public Searcher(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+                StorageComponentProvider storageManager, TestLsmBtree lsmBtree, int numOfRecords) {
+            lsmBtree.addSearchCallback(sem -> {
+                synchronized (Searcher.this) {
+                    entered = true;
+                    Searcher.this.notifyAll();
+                }
+            });
+            Callable<Boolean> callable = new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    searchAndAssertCount(nc, ctx, dataset, storageManager, numOfRecords);
+                    return true;
+                }
+            };
+            task = executor.submit(callable);
+        }
+
+        boolean result() throws Exception {
+            return task.get();
+        }
+
+        synchronized void waitUntilEntered() throws InterruptedException {
+            while (!entered) {
+                this.wait();
+            }
+        }
+    }
+
+    private class Merger {
+        private volatile int count = 0;
+
+        public Merger(TestLsmBtree lsmBtree) {
+            lsmBtree.addMergeCallback(sem -> {
+                synchronized (Merger.this) {
+                    count++;
+                    Merger.this.notifyAll();
+                }
+            });
+        }
+
+        synchronized void waitUntilCount(int count) throws InterruptedException {
+            while (this.count != count) {
+                this.wait();
+            }
+        }
+    }
+
+    private class Flusher {
+        private volatile int count = 0;
+
+        public Flusher(TestLsmBtree lsmBtree) {
+            lsmBtree.addFlushCallback(sem -> {
+                synchronized (Flusher.this) {
+                    count++;
+                    Flusher.this.notifyAll();
+                }
+            });
+        }
+
+        synchronized void waitUntilCount(int count) throws InterruptedException {
+            while (this.count != count) {
+                this.wait();
+            }
+        }
+    }
+
+    private class DiskComponentLsnPredicate implements Predicate<ILSMComponent> {
+        private final long lsn;
+
+        public DiskComponentLsnPredicate(long lsn) {
+            this.lsn = lsn;
+        }
+
+        @Override
+        public boolean test(ILSMComponent c) {
+            try {
+                return c instanceof ILSMMemoryComponent
+                        || (c instanceof ILSMDiskComponent && AbstractLSMIOOperationCallback
+                                .getTreeIndexLSN(((ILSMDiskComponent) c).getMetadata()) >= lsn);
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+                return false;
+            }
+        }
+    }
+
+    private void searchAndAssertCount(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+            StorageComponentProvider storageManager, int numOfRecords)
+            throws HyracksDataException, AlgebricksException {
+        nc.newJobId();
+        TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                Collections.emptyList(), Collections.emptyList(), false);
+        IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
+        emptyTupleOp.open();
+        emptyTupleOp.close();
+        Assert.assertEquals(numOfRecords, countOp.getCount());
+    }
+
+    public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+        CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+                exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+                exceptionThrowingOperations, errorThrowingOperations);
+        return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+                closeAnswer, deepCopyInputFrames);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1e3960f..a0e3aa9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
 import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
 import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
@@ -58,9 +59,10 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
+import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -113,8 +115,8 @@ public class LogMarkerTest {
                             partitioningKeys, null, null, null, false, null, false),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
-                        null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+                PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+                        new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
                 ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
@@ -147,13 +149,14 @@ public class LogMarkerTest {
                 }
                 insertOp.close();
                 nc.getTransactionManager().completedTransaction(txnCtx, DatasetId.NULL, -1, true);
-                IIndexDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES, RECORD_TYPE,
-                        META_TYPE, new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES,
-                        KEY_INDICATORS_LIST);
+                IndexDataflowHelperFactory iHelperFactory =
+                        new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider());
+                IIndexDataflowHelper dataflowHelper =
+                        iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
                 dataflowHelper.open();
                 LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
                 LongPointable longPointable = LongPointable.FACTORY.createPointable();
-                ComponentMetadataUtil.get(btree, ComponentMetadataUtil.MARKER_LSN_KEY, longPointable);
+                ComponentUtils.get(btree, ComponentUtils.MARKER_LSN_KEY, longPointable);
                 long lsn = longPointable.getLong();
                 int numOfMarkers = 0;
                 LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
@@ -204,4 +207,4 @@ public class LogMarkerTest {
         return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
                 closeAnswer, deepCopyInputFrames);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b2e50b7d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
new file mode 100644
index 0000000..893b428
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.Map;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.IDatasetDetails;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.common.IResourceFactory;
+
+public class TestDataset extends Dataset {
+
+    private static final long serialVersionUID = 1L;
+
+    public TestDataset(String dataverseName, String datasetName, String recordTypeDataverseName, String recordTypeName,
+            String nodeGroupName, String compactionPolicy, Map<String, String> compactionPolicyProperties,
+            IDatasetDetails datasetDetails, Map<String, String> hints, DatasetType datasetType, int datasetId,
+            int pendingOp) {
+        super(dataverseName, datasetName, recordTypeDataverseName, recordTypeName, nodeGroupName, compactionPolicy,
+                compactionPolicyProperties, datasetDetails, hints, datasetType, datasetId, pendingOp);
+    }
+
+    @Override
+    public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
+            ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
+            throws AlgebricksException {
+        ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType);
+        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(this,
+                recordType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+        IResourceFactory resourceFactory =
+                TestLsmBTreeResourceFactoryProvider.INSTANCE.getResourceFactory(mdProvider, this, index, recordType,
+                        metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories);
+        return new DatasetLocalResourceFactory(getDatasetId(), resourceFactory);
+    }
+
+    @Override
+    public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
+        return TestLsmBtreeIoOpCallbackFactory.INSTANCE;
+    }
+}