You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/05/21 03:35:24 UTC

[4/5] asterixdb git commit: [NO ISSUE][STO] Misc Storage Fixes and Improvements

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
new file mode 100644
index 0000000..a0ed26e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.datagen.TupleGenerator;
+import org.apache.hyracks.storage.am.common.datagen.IFieldValueGenerator;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class CheckpointInSecondaryIndexTest {
+    static final int REPREAT_TEST_COUNT = 1;
+
+    @Parameterized.Parameters
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+    }
+
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final int RECORDS_PER_COMPONENT = 500;
+    private static final int DATASET_ID = 101;
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String INDEX_NAME = "TestIdx";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final IndexType INDEX_TYPE = IndexType.BTREE;
+    private static final IFieldValueGenerator[] SECONDARY_INDEX_VALUE_GENERATOR =
+            { new AInt64ValueGenerator(), new AInt32ValueGenerator() };
+    private static final List<List<String>> INDEX_FIELD_NAMES =
+            Arrays.asList(Arrays.asList(RECORD_TYPE.getFieldNames()[1]));
+    private static final List<Integer> INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR);
+    private static final List<IAType> INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64);
+    private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+    private static TestNodeController nc;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+    private static Dataset dataset;
+    private static Index secondaryIndex;
+    private static ITransactionContext txnCtx;
+    private static TestLsmBtree primaryLsmBtree;
+    private static TestLsmBtree secondaryLsmBtree;
+    private static PrimaryIndexInfo primaryIndexInfo;
+    private static IHyracksTaskContext taskCtx;
+    private static IIndexDataflowHelper primaryIndexDataflowHelper;
+    private static IIndexDataflowHelper secondaryIndexDataflowHelper;
+    private static LSMInsertDeleteOperatorNodePushable insertOp;
+    private static LSMIndexBulkLoadOperatorNodePushable indexLoadOp;
+    private static IHyracksTaskContext loadTaskCtx;
+    private static SecondaryIndexInfo secondaryIndexInfo;
+    private static Actor actor;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "cc-multipart.conf";
+        nc = new TestNodeController(configPath, false);
+        nc.init();
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.out.println("TearDown");
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Before
+    public void createIndex() throws Exception {
+        List<List<String>> partitioningKeys = new ArrayList<>();
+        partitioningKeys.add(Collections.singletonList("key"));
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
+                        PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
+                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, INDEX_TYPE, INDEX_FIELD_NAMES,
+                INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, false, 0);
+        taskCtx = null;
+        primaryIndexDataflowHelper = null;
+        secondaryIndexDataflowHelper = null;
+        primaryLsmBtree = null;
+        insertOp = null;
+        JobId jobId = nc.newJobId();
+        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        actor = null;
+        taskCtx = nc.createTestContext(jobId, 0, false);
+        primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager,
+                KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        primaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0);
+        primaryIndexDataflowHelper.open();
+        primaryLsmBtree = (TestLsmBtree) primaryIndexDataflowHelper.getIndexInstance();
+        primaryIndexDataflowHelper.close();
+        // This pipeline skips the secondary index
+        insertOp = nc.getInsertPipeline(taskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager, null).getLeft();
+        actor = new Actor("player");
+        // allow all operations
+        StorageTestUtils.allowAllOps(primaryLsmBtree);
+        actor.add(new Request(Request.Action.INSERT_OPEN));
+    }
+
+    @After
+    public void destroyIndex() throws Exception {
+        Request close = new Request(Request.Action.INSERT_CLOSE);
+        actor.add(close);
+        close.await();
+        nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+        if (secondaryIndexDataflowHelper != null) {
+            secondaryIndexDataflowHelper.destroy();
+        }
+        primaryIndexDataflowHelper.destroy();
+        actor.stop();
+    }
+
+    @Test
+    public void testCheckpointUpdatedWhenSecondaryIsEmpty() throws Exception {
+        try {
+            // create secondary
+            createSecondaryIndex();
+            actor.add(new Request(Request.Action.INSERT_PATCH));
+            ensureDone(actor);
+            // search now and ensure partition 0 has all the records
+            StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+            // and that secondary index is empty
+            Assert.assertTrue(secondaryLsmBtree.isCurrentMutableComponentEmpty());
+            // flush
+            actor.add(new Request(Request.Action.FLUSH_DATASET));
+            ensureDone(actor);
+            // ensure primary has a component
+            Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+
+            // ensure secondary doesn't have a component
+            Assert.assertEquals(0, secondaryLsmBtree.getDiskComponents().size());
+            // ensure that current memory component index match
+            Assert.assertEquals(secondaryLsmBtree.getCurrentMemoryComponentIndex(),
+                    primaryLsmBtree.getCurrentMemoryComponentIndex());
+            // ensure both checkpoint files has the same component id as the last flushed component id
+            ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+            LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+            long min = id.getMinId();
+            // primary ref
+            Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+            fileManagerField.setAccessible(true); //Make it accessible so you can access it
+            ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+            final ResourceReference primaryRef = ResourceReference
+                    .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+            // secondary ref
+            ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+            final ResourceReference secondaryRef = ResourceReference.of(
+                    secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+            IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+            Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private void createSecondaryIndex()
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        SecondaryIndexInfo secondaryIndexInfo =
+                nc.createSecondaryIndex(primaryIndexInfo, secondaryIndex, storageManager, 0);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider());
+        secondaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0);
+        secondaryIndexDataflowHelper.open();
+        secondaryLsmBtree = (TestLsmBtree) secondaryIndexDataflowHelper.getIndexInstance();
+        secondaryIndexDataflowHelper.close();
+    }
+
+    @Test
+    public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsSingleComponent() throws Exception {
+        try {
+            // create secondary
+            actor.add(new Request(Request.Action.INSERT_PATCH));
+            ensureDone(actor);
+            // search now and ensure partition 0 has all the records
+            StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+            // flush
+            actor.add(new Request(Request.Action.FLUSH_DATASET));
+            ensureDone(actor);
+            // ensure primary has a component
+            Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+            // ensure both checkpoint files has the same component id as the last flushed component id
+            ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+            LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+            long min = id.getMinId();
+            // primary ref
+            Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+            fileManagerField.setAccessible(true); //Make it accessible so you can access it
+            ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+            final ResourceReference primaryRef = ResourceReference
+                    .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+            IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+            createSecondaryIndex();
+            JobId jobId = nc.newJobId();
+            loadTaskCtx = nc.createTestContext(jobId, 0, false);
+            Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+                    nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                            KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+            indexLoadOp = infoAndOp.getRight();
+            secondaryIndexInfo = infoAndOp.getLeft();
+            actor.add(new Request(Request.Action.LOAD_OPEN));
+            actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+            actor.add(new Request(Request.Action.LOAD_CLOSE));
+            ensureDone(actor);
+            latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+            ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+            final ResourceReference secondaryRef = ResourceReference.of(
+                    secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+            IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @Test
+    public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsTwoComponents() throws Exception {
+        try {
+            // create secondary
+            actor.add(new Request(Request.Action.INSERT_PATCH));
+            ensureDone(actor);
+            // search now and ensure partition 0 has all the records
+            StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+            // flush
+            actor.add(new Request(Request.Action.FLUSH_DATASET));
+            ensureDone(actor);
+            // ensure primary has a component
+            Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+            // ensure both checkpoint files has the same component id as the last flushed component id
+            ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+            LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+            long min = id.getMinId();
+            // primary ref
+            Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+            fileManagerField.setAccessible(true); //Make it accessible so you can access it
+            ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+            final ResourceReference primaryRef = ResourceReference
+                    .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+            IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+            actor.add(new Request(Request.Action.INSERT_PATCH));
+            ensureDone(actor);
+            actor.add(new Request(Request.Action.FLUSH_DATASET));
+            ensureDone(actor);
+            Assert.assertEquals(2, primaryLsmBtree.getDiskComponents().size());
+            // ensure both checkpoint files has the same component id as the last flushed component id
+            primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+            id = (LSMComponentId) primaryDiskComponent.getId();
+            min = id.getMaxId();
+            createSecondaryIndex();
+            JobId jobId = nc.newJobId();
+            loadTaskCtx = nc.createTestContext(jobId, 0, false);
+            Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+                    nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                            KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+            indexLoadOp = infoAndOp.getRight();
+            secondaryIndexInfo = infoAndOp.getLeft();
+            actor.add(new Request(Request.Action.LOAD_OPEN));
+            actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+            actor.add(new Request(Request.Action.LOAD_CLOSE));
+            ensureDone(actor);
+            latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+            ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+            final ResourceReference secondaryRef = ResourceReference.of(
+                    secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+            IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @Test
+    public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsEmpty() throws Exception {
+        try {
+            // ensure primary has no component
+            Assert.assertEquals(0, primaryLsmBtree.getDiskComponents().size());
+            // primary ref
+            Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+            fileManagerField.setAccessible(true); //Make it accessible so you can access it
+            ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+            final ResourceReference primaryRef = ResourceReference
+                    .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+            IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            createSecondaryIndex();
+            JobId jobId = nc.newJobId();
+            loadTaskCtx = nc.createTestContext(jobId, 0, false);
+            Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+                    nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                            KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+            indexLoadOp = infoAndOp.getRight();
+            secondaryIndexInfo = infoAndOp.getLeft();
+            actor.add(new Request(Request.Action.LOAD_OPEN));
+            actor.add(new Request(Request.Action.LOAD_CLOSE));
+            ensureDone(actor);
+            latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+            final ResourceReference secondaryRef = ResourceReference.of(
+                    secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+            IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(),
+                    latestPrimaryCheckpoint.getLastComponentId());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @Test
+    public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsNotEmpty() throws Exception {
+        try {
+            // create secondary
+            actor.add(new Request(Request.Action.INSERT_PATCH));
+            ensureDone(actor);
+            // search now and ensure partition 0 has all the records
+            StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+            // flush
+            actor.add(new Request(Request.Action.FLUSH_DATASET));
+            ensureDone(actor);
+            // ensure primary has a component
+            Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+            // ensure both checkpoint files has the same component id as the last flushed component id
+            ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+            LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+            long min = id.getMinId();
+            // primary ref
+            Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+            fileManagerField.setAccessible(true); //Make it accessible so you can access it
+            ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+            final ResourceReference primaryRef = ResourceReference
+                    .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+            IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+            createSecondaryIndex();
+            JobId jobId = nc.newJobId();
+            loadTaskCtx = nc.createTestContext(jobId, 0, false);
+            Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+                    nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                            KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+            indexLoadOp = infoAndOp.getRight();
+            secondaryIndexInfo = infoAndOp.getLeft();
+            actor.add(new Request(Request.Action.LOAD_OPEN));
+            actor.add(new Request(Request.Action.LOAD_CLOSE));
+            ensureDone(actor);
+            latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+            ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+            final ResourceReference secondaryRef = ResourceReference.of(
+                    secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+            IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+            IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+            Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+        return ncAppCtx.getIndexCheckpointManagerProvider();
+    }
+
+    private void ensureDone(Actor actor) throws InterruptedException {
+        Request req = new Request(Request.Action.DUMMY);
+        actor.add(req);
+        req.await();
+    }
+
+    private static class Request {
+        enum Action {
+            DUMMY,
+            INSERT_OPEN,
+            LOAD_OPEN,
+            INSERT_PATCH,
+            INDEX_LOAD_PATCH,
+            FLUSH_DATASET,
+            INSERT_CLOSE,
+            LOAD_CLOSE,
+        }
+
+        private final Action action;
+        private volatile boolean done;
+
+        public Request(Action action) {
+            this.action = action;
+            done = false;
+        }
+
+        synchronized void complete() {
+            done = true;
+            notifyAll();
+        }
+
+        synchronized void await() throws InterruptedException {
+            while (!done) {
+                wait();
+            }
+        }
+    }
+
+    public class Actor extends SingleThreadEventProcessor<Request> {
+        private final RecordTupleGenerator primaryInsertTupleGenerator;
+        private final FrameTupleAppender tupleAppender;
+
+        public Actor(String name) throws HyracksDataException {
+            super(name);
+            primaryInsertTupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            tupleAppender = new FrameTupleAppender(new VSizeFrame(taskCtx));
+        }
+
+        @Override
+        protected void handle(Request req) throws Exception {
+            try {
+                switch (req.action) {
+                    case FLUSH_DATASET:
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOp, true);
+                        }
+                        dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                        break;
+                    case INSERT_CLOSE:
+                        insertOp.close();
+                        break;
+                    case INSERT_OPEN:
+                        insertOp.open();
+                        break;
+                    case LOAD_OPEN:
+                        indexLoadOp.open();
+                        break;
+                    case LOAD_CLOSE:
+                        indexLoadOp.close();
+                        break;
+                    case INSERT_PATCH:
+                        for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+                            ITupleReference tuple = primaryInsertTupleGenerator.next();
+                            DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                        }
+                        if (tupleAppender.getTupleCount() > 0) {
+                            tupleAppender.write(insertOp, true);
+                        }
+                        StorageTestUtils.waitForOperations(primaryLsmBtree);
+                        break;
+                    case INDEX_LOAD_PATCH:
+                        TupleGenerator secondaryLoadTupleGenerator =
+                                new TupleGenerator(SECONDARY_INDEX_VALUE_GENERATOR, secondaryIndexInfo.getSerdes(), 0);
+                        FrameTupleAppender secondaryTupleAppender = new FrameTupleAppender(new VSizeFrame(loadTaskCtx));
+                        for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+                            ITupleReference tuple = secondaryLoadTupleGenerator.next();
+                            DataflowUtils.addTupleToFrame(secondaryTupleAppender, tuple, indexLoadOp);
+                        }
+                        if (secondaryTupleAppender.getTupleCount() > 0) {
+                            secondaryTupleAppender.write(indexLoadOp, true);
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            } catch (Throwable th) {
+                th.printStackTrace();
+                throw th;
+            } finally {
+                req.complete();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a33bda1..017c59f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -27,7 +27,7 @@ import java.util.function.Predicate;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -122,7 +122,7 @@ public class ComponentRollbackTest {
             // allow all operations
             StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -190,7 +190,7 @@ public class ComponentRollbackTest {
             // allow all operations
             StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -272,7 +272,7 @@ public class ComponentRollbackTest {
             StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -356,7 +356,7 @@ public class ComponentRollbackTest {
             // allow all operations
             StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -411,7 +411,7 @@ public class ComponentRollbackTest {
             StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -474,7 +474,7 @@ public class ComponentRollbackTest {
             // allow all operations
             StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -533,7 +533,7 @@ public class ComponentRollbackTest {
             // allow all operations
             StorageTestUtils.allowAllOps(lsmBtree);
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -594,7 +594,7 @@ public class ComponentRollbackTest {
             StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -664,7 +664,7 @@ public class ComponentRollbackTest {
             StorageTestUtils.allowAllOps(lsmBtree);
             lsmBtree.clearMergeCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+            RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
index eb16cf4..b618727 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
@@ -22,7 +22,7 @@ import java.io.File;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -118,7 +118,7 @@ public class IoCallbackFailureTest {
             throws Exception {
         NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext();
         IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
-        TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+        RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
         ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         boolean failed = false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index e4623fd..79e6368 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Semaphore;
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
 import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -92,7 +92,7 @@ public class LSMFlushRecoveryTest {
     private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
     private static ITransactionContext txnCtx;
     private static LSMInsertDeleteOperatorNodePushable[] insertOps;
-    private static TupleGenerator tupleGenerator;
+    private static RecordTupleGenerator tupleGenerator;
 
     private static final int NUM_PARTITIONS = 2;
     private static final int PARTITION_0 = 0;
@@ -478,6 +478,8 @@ public class LSMFlushRecoveryTest {
             ILSMMemoryComponent primaryMemComponent = primaryIndexes[partitionIndex].getCurrentMemoryComponent();
             ILSMMemoryComponent secondaryMemComponent = secondaryIndexes[partitionIndex].getCurrentMemoryComponent();
             Assert.assertEquals(primaryMemComponent.getId(), secondaryMemComponent.getId());
+            Assert.assertEquals(primaryIndexes[partitionIndex].getCurrentMemoryComponentIndex(),
+                    secondaryIndexes[partitionIndex].getCurrentMemoryComponentIndex());
         }
 
         List<ILSMDiskComponent> primaryDiskComponents = primaryIndexes[partitionIndex].getDiskComponents();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 2121327..e2c99b0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -26,10 +26,9 @@ import java.util.List;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
 import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -37,10 +36,7 @@ import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
@@ -113,22 +109,20 @@ public class LogMarkerTest {
             StorageComponentProvider storageManager = new StorageComponentProvider();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                    NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                            partitioningKeys, null, null, null, false, null),
-                    null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                        storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+                PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE,
+                        META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
                 JobId jobId = nc.newJobId();
                 IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
                 ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
-                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
+                LSMInsertDeleteOperatorNodePushable insertOp =
+                        nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                                KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
                 insertOp.open();
-                TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                RecordTupleGenerator tupleGenerator =
+                        new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                                RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
                 VSizeFrame frame = new VSizeFrame(ctx);
                 VSizeFrame marker = new VSizeFrame(ctx);
                 FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
@@ -178,9 +172,9 @@ public class LogMarkerTest {
                 nc.newJobId();
                 TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
                         Collections.emptyList(), Collections.emptyList(), false);
-                IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE,
-                        META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST,
-                        storageManager);
+                IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, StorageTestUtils.DATASET, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES,
+                        KEY_INDICATORS_LIST, storageManager);
                 emptyTupleOp.open();
                 emptyTupleOp.close();
                 Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 1795c93..a7225a1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -29,8 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
 import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -619,14 +619,14 @@ public class MultiPartitionLSMIndexTest {
 
     public class Actor extends SingleThreadEventProcessor<Request> {
         private final int partition;
-        private final TupleGenerator tupleGenerator;
+        private final RecordTupleGenerator tupleGenerator;
         private final VSizeFrame frame;
         private final FrameTupleAppender tupleAppender;
 
         public Actor(String name, int partition) throws HyracksDataException {
             super(name);
             this.partition = partition;
-            tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+            tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                     RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
             frame = new VSizeFrame(taskCtxs[partition]);
             tupleAppender = new FrameTupleAppender(frame);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 72026a2..61c1fb2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -26,8 +26,8 @@ import java.util.List;
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
 import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -161,8 +161,8 @@ public class SearchCursorComponentSwitchTest {
             // except search
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+                    KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             Searcher firstSearcher = null;
@@ -207,8 +207,8 @@ public class SearchCursorComponentSwitchTest {
             // except search
             lsmBtree.clearSearchCallbacks();
             insertOp.open();
-            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
-                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+                    KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             Searcher firstSearcher = null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index d08fc72..589e8b2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -33,9 +33,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
 import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.DatasetInfo;
@@ -100,7 +100,7 @@ public class StorageTestUtils {
     private StorageTestUtils() {
     }
 
-    static void allowAllOps(TestLsmBtree lsmBtree) {
+    public static void allowAllOps(TestLsmBtree lsmBtree) {
         lsmBtree.clearModifyCallbacks();
         lsmBtree.clearFlushCallbacks();
         lsmBtree.clearSearchCallbacks();
@@ -118,6 +118,12 @@ public class StorageTestUtils {
                 KEY_INDICATORS_LIST, partition);
     }
 
+    public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, Dataset dataset, int partition)
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES,
+                KEY_INDICATORS_LIST, partition);
+    }
+
     public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
             throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
         return getInsertPipeline(nc, ctx, null);
@@ -131,13 +137,27 @@ public class StorageTestUtils {
     }
 
     public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+            Dataset dataset, Index secondaryIndex, IndexOperation op)
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, op).getLeft();
+    }
+
+    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
             Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
         return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
     }
 
-    public static TupleGenerator getTupleGenerator() {
-        return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
+    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+            Dataset dataset, Index secondaryIndex)
+            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
+    }
+
+    public static RecordTupleGenerator getTupleGenerator() {
+        return new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
                 UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
     }
 
@@ -146,6 +166,11 @@ public class StorageTestUtils {
         searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords);
     }
 
+    public static void searchAndAssertCount(TestNodeController nc, Dataset dataset, int partition, int numOfRecords)
+            throws HyracksDataException, AlgebricksException {
+        searchAndAssertCount(nc, partition, dataset, STORAGE_MANAGER, numOfRecords);
+    }
+
     public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
             StorageComponentProvider storageManager, int numOfRecords)
             throws HyracksDataException, AlgebricksException {
@@ -182,6 +207,11 @@ public class StorageTestUtils {
         flushPartition(dslLifecycleMgr, lsmBtree, DATASET, async);
     }
 
+    public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree,
+            boolean async) throws Exception {
+        flushPartition(dslLifecycleMgr, lsmBtree, dataset, async);
+    }
+
     public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
             boolean async) throws Exception {
         waitForOperations(lsmBtree);
@@ -211,6 +241,11 @@ public class StorageTestUtils {
         flush(dsLifecycleMgr, lsmBtree, DATASET, async);
     }
 
+    public static void flush(IDatasetLifecycleManager dsLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree,
+            boolean async) throws Exception {
+        flush(dsLifecycleMgr, lsmBtree, dataset, async);
+    }
+
     public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
             boolean async) throws Exception {
         waitForOperations(lsmBtree);
@@ -240,6 +275,11 @@ public class StorageTestUtils {
             this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords);
         }
 
+        public Searcher(TestNodeController nc, Dataset dataset, int partition, TestLsmBtree lsmBtree,
+                int numOfRecords) {
+            this(nc, partition, dataset, STORAGE_MANAGER, lsmBtree, numOfRecords);
+        }
+
         public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
                 TestLsmBtree lsmBtree, int numOfRecords) {
             lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index 20875a3..bcf68b5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -21,6 +21,7 @@ package org.apache.asterix.test.dataflow;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -28,9 +29,14 @@ import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
+import org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
@@ -48,6 +54,19 @@ public class TestDataset extends Dataset {
     }
 
     @Override
+    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider,
+            int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
+        return new IPushRuntimeFactory() {
+            @Override
+            public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+                return new IPushRuntime[] { new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()),
+                        getDatasetId(), primaryKeyFieldPermutation, true,
+                        ctx.getTaskAttemptId().getTaskId().getPartition(), true) };
+            }
+        };
+    }
+
+    @Override
     public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
             ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
             throws AlgebricksException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
index 7a3e475..bee2f8d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Semaphore;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -68,7 +68,7 @@ public class TransactionAbortTest {
     private static IHyracksTaskContext abortCtx;
     private static ITransactionContext abortTxnCtx;
     private static LSMInsertDeleteOperatorNodePushable abortOp;
-    private static TupleGenerator tupleGenerator;
+    private static RecordTupleGenerator tupleGenerator;
 
     @Rule
     public TestRule watcher = new TestMethodTracer();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 418282e..3634bf1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.test.logging;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
 import java.io.File;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -26,10 +30,9 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.RecoveryManager;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.Checkpoint;
@@ -43,14 +46,12 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.dataflow.StorageTestUtils;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
@@ -60,17 +61,12 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.stubbing.Answer;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
-
 public class CheckpointingTest {
 
     private static final String TEST_CONFIG_FILE_NAME = "cc-small-txn-log-partition.conf";
@@ -116,23 +112,21 @@ public class CheckpointingTest {
             nc.init();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                    NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                            partitioningKeys, null, null, null, false, null),
-                    null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
-                        KEY_INDICATOR_LIST, 0);
+                nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager,
+                        KEY_INDEXES, KEY_INDICATOR_LIST, 0);
                 JobId jobId = nc.newJobId();
                 IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false);
                 ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+                LSMInsertDeleteOperatorNodePushable insertOp =
+                        nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                                KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
                 insertOp.open();
-                TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
-                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                RecordTupleGenerator tupleGenerator =
+                        new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
+                                RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
                 VSizeFrame frame = new VSizeFrame(ctx);
                 FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
 
@@ -197,8 +191,9 @@ public class CheckpointingTest {
                 nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
-                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+                LSMInsertDeleteOperatorNodePushable insertOp2 =
+                        nc.getInsertPipeline(ctx2, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                                KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
                 insertOp2.open();
                 VSizeFrame frame2 = new VSizeFrame(ctx2);
                 FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
@@ -220,6 +215,7 @@ public class CheckpointingTest {
                     }
                 }
                 Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+                    @Override
                     public void uncaughtException(Thread th, Throwable ex) {
                         threadException = true;
                         exception = ex;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index c7ae2df..62c882d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -25,7 +25,6 @@ import java.util.List;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -107,6 +106,10 @@ public class ExecutionTestUtil {
         tearDown(cleanup, integrationUtil, true);
     }
 
+    public static void tearDown(boolean cleanup, boolean stopHdfs) throws Exception {
+        tearDown(cleanup, integrationUtil, stopHdfs);
+    }
+
     public static void tearDown(boolean cleanup, AsterixHyracksIntegrationUtil integrationUtil, boolean stopHdfs)
             throws Exception {
         // validateBufferCacheState(); <-- Commented out until bug is fixed -->

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 9eb6259..b6581ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IReplicaManager;
+import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -65,7 +66,8 @@ public interface INcApplicationContext extends IApplicationContext {
 
     IResourceIdFactory getResourceIdFactory();
 
-    void initialize(boolean initialRun) throws IOException, AlgebricksException;
+    void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun)
+            throws IOException, AlgebricksException;
 
     void setShuttingdown(boolean b);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index f4d764a..6e2e320 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -55,7 +55,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
         this.setRegistered(false);
         this.setMemoryAllocated(false);
         this.logManager = logManager;
-        waitLog.setLogType(LogType.WAIT);
+        waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index e5d18cf..50a4bef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -23,9 +23,7 @@ import static org.apache.asterix.common.metadata.MetadataIndexImmutablePropertie
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +32,7 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.IDatasetMemoryManager;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -42,19 +40,16 @@ import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -70,7 +65,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     private final ILocalResourceRepository resourceRepository;
     private final IDatasetMemoryManager memoryManager;
     private final ILogManager logManager;
-    private final LogRecord logRecord;
+    private final LogRecord waitLog;
     private final int numPartitions;
     private volatile boolean stopped = false;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
@@ -84,7 +79,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         this.memoryManager = memoryManager;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.numPartitions = numPartitions;
-        logRecord = new LogRecord();
+        waitLog = new LogRecord();
+        waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
+        waitLog.computeAndSetLogSize();
     }
 
     @Override
@@ -371,7 +368,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
-            flushDatasetOpenIndexes(dsr, false);
+            if (dsr.getDatasetInfo().isOpen()) {
+                flushDatasetOpenIndexes(dsr, false);
+            }
         }
     }
 
@@ -423,77 +422,48 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
      */
     private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
         DatasetInfo dsInfo = dsr.getDatasetInfo();
+        if (!dsInfo.isOpen()) {
+            throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed");
+        }
         if (dsInfo.isExternal()) {
             // no memory components for external dataset
             return;
         }
+        // ensure all in-flight flushes gets scheduled
+        logManager.log(waitLog);
         for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
             // flush each partition one by one
             if (primaryOpTracker.getNumActiveOperations() > 0) {
                 throw new IllegalStateException(
                         "flushDatasetOpenIndexes is called on a dataset with currently active operations");
             }
-            int partition = primaryOpTracker.getPartition();
-            Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
-            ILSMIndex flushIndex = null;
-            for (ILSMIndex lsmIndex : indexes) {
-                if (!lsmIndex.isCurrentMutableComponentEmpty()) {
-                    flushIndex = lsmIndex;
-                    break;
-                }
-            }
-            if (flushIndex == null) {
-                // all open indexes are empty, nothing to flush
-                continue;
-            }
-            LSMComponentId componentId = (LSMComponentId) flushIndex.getCurrentMemoryComponent().getId();
-            ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition);
-            idGenerator.refresh();
-            if (dsInfo.isDurable()) {
-                synchronized (logRecord) {
-                    TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), partition,
-                            componentId.getMinId(), componentId.getMaxId(), null);
-                    try {
-                        logManager.log(logRecord);
-                    } catch (ACIDException e) {
-                        throw new HyracksDataException("could not write flush log while closing dataset", e);
-                    }
-
-                    try {
-                        //notification will come from LogBuffer class (notifyFlushTerminator)
-                        logRecord.wait();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw HyracksDataException.create(e);
-                    }
-                }
-            }
-            long flushLsn = logRecord.getLSN();
-            ILSMComponentId nextComponentId = idGenerator.getId();
-            Map<String, Object> flushMap = new HashMap<>();
-            flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
-            flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
-            for (ILSMIndex index : indexes) {
-                ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-                accessor.getOpContext().setParameters(flushMap);
-                accessor.scheduleFlush();
-            }
-            if (!asyncFlush) {
-                // Wait for the above flush op.
-                dsInfo.waitForIO();
+            primaryOpTracker.setFlushOnExit(true);
+            primaryOpTracker.flushIfNeeded();
+        }
+        // ensure requested flushes were scheduled
+        logManager.log(waitLog);
+        if (!asyncFlush) {
+            List<FlushOperation> flushes = new ArrayList<>();
+            for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+                flushes.addAll(primaryOpTracker.getScheduledFlushes());
             }
+            LSMIndexUtil.waitFor(flushes);
         }
     }
 
     private void closeDataset(DatasetResource dsr) throws HyracksDataException {
         // First wait for any ongoing IO operations
         DatasetInfo dsInfo = dsr.getDatasetInfo();
-        dsInfo.waitForIO();
         try {
             flushDatasetOpenIndexes(dsr, false);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
+        // wait for merges that were scheduled due to the above flush
+        // ideally, we shouldn't need this since merges should still work.
+        // They don't need a special memory budget but there is a problem
+        // for some merge policies that need to access dataset info (correlated prefix)
+        dsInfo.waitForIO();
         for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
             closeIndex(iInfo);
         }
@@ -505,7 +475,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     public synchronized void closeAllDatasets() throws HyracksDataException {
         ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
         for (DatasetResource dsr : openDatasets) {
-            closeDataset(dsr);
+            if (dsr.isOpen()) {
+                closeDataset(dsr);
+            }
         }
     }
 
@@ -612,7 +584,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     @Override
     public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
-            if (replicationStrategy.isMatch(dsr.getDatasetID())) {
+            if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
                 flushDatasetOpenIndexes(dsr, false);
             }
         }