You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/11/21 05:32:33 UTC

[3/3] asterixdb git commit: [ASTERIXDB-2169][STO][TX] Unblock modifications during full scan

[ASTERIXDB-2169][STO][TX] Unblock modifications during full scan

- user model changes: no
- storage format changes: no
- interface changes: yes
  - added ILSMHarness.replaceMemoryComponentsWithDiskComponents

details:
- During a long running query aka full scan, two things block
  incoming modifications:
  1) Memory component gets full, is flushed but can't be recycled
     because of the search operation inside the component.
  2) Read latches on the memory component not being released and
     the memory component search cursor is not advancing.
  The two cases are addressed in this change for the LSMBTree but
  not yet addressed for other indexes.
  The proposed solution for case (1) is to poll memory components
  states every n records during the search operation. If a memory
  component was found to have been flushed, its cursor is moved
  to the corresponding disk component allowing the memory
  component to be recycled.
  The proposed solution for case (2) is to check memory component
  cursor every n records. If the cursor has not advanced and the
  component has writers, then the latches over the leaf page are
  released, and the cursor re-do the operation entering from the
  tree root.
- Added a test case.
- Added performance traces for enter and exit components.

Change-Id: I37ba52f6324ed1c5a78465c3a8cbcd351f1ed5bc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2166
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Luo Chen <cl...@uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e5a65429
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e5a65429
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e5a65429

Branch: refs/heads/master
Commit: e5a65429d94c34b3079a3fe6bad206089c2c193c
Parents: 76ecc4b
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Mon Nov 20 17:14:01 2017 -0800
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Mon Nov 20 21:32:04 2017 -0800

----------------------------------------------------------------------
 .../app/bootstrap/TestNodeController.java       |  58 +--
 .../test/dataflow/ComponentRollbackTest.java    |  94 +++--
 .../asterix/test/dataflow/LogMarkerTest.java    |   7 +-
 .../dataflow/MultiPartitionLSMIndexTest.java    | 196 ++++++++++
 .../SearchCursorComponentSwitchTest.java        | 266 +++++++++++++
 .../asterix/test/dataflow/TestDataset.java      |   3 +-
 .../TestLsmBtreeIoOpCallbackFactory.java        |   8 +-
 .../asterix/test/logging/CheckpointingTest.java |   7 +-
 .../asterix/test/storage/DeallocatableTest.java |   4 +-
 .../asterix/test/storage/DiskIsFullTest.java    |   8 +-
 .../IndexDropOperatorNodePushableTest.java      |  28 +-
 .../AbstractLSMIOOperationCallback.java         |  10 +-
 ...actOneInputOneOutputOneFramePushRuntime.java |  16 +-
 .../meta/AlgebricksMetaOperatorDescriptor.java  |  49 ++-
 .../apache/hyracks/http/server/HttpServer.java  |  10 +-
 .../dataflow/ExternalBTreeLocalResource.java    |   2 +-
 .../ExternalBTreeWithBuddyLocalResource.java    |   2 +-
 .../am/lsm/btree/impls/ExternalBTree.java       |  27 +-
 .../lsm/btree/impls/ExternalBTreeOpContext.java |   8 +-
 .../lsm/btree/impls/ExternalBTreeWithBuddy.java |  26 +-
 .../impls/ExternalBTreeWithBuddyOpContext.java  |   9 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java    |  14 +-
 .../am/lsm/btree/impls/LSMBTreeOpContext.java   |  13 +-
 .../btree/impls/LSMBTreeRangeSearchCursor.java  | 152 +++++++-
 .../am/lsm/btree/utils/LSMBTreeUtil.java        |   9 +-
 .../am/lsm/common/api/ILSMComponent.java        |   5 +
 .../storage/am/lsm/common/api/ILSMHarness.java  |  13 +
 .../storage/am/lsm/common/api/ILSMIndex.java    |   5 +
 .../common/api/ILSMIndexOperationContext.java   |  26 ++
 .../lsm/common/impls/AbstractLSMComponent.java  |   7 +-
 .../common/impls/AbstractLSMDiskComponent.java  |   9 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java   |  39 +-
 .../impls/AbstractLSMIndexOperationContext.java |  42 ++-
 .../impls/AbstractLSMMemoryComponent.java       |   6 +
 .../impls/ComponentReplacementContext.java      | 215 +++++++++++
 .../am/lsm/common/impls/LSMComponentId.java     |   8 +-
 .../storage/am/lsm/common/impls/LSMHarness.java | 373 +++++++++++--------
 .../impls/LSMIndexDiskComponentBulkLoader.java  |   2 +-
 .../common/impls/LSMIndexReplicationJob.java    |   2 +-
 .../lsm/common/impls/LSMIndexSearchCursor.java  |  19 +-
 .../invertedindex/impls/LSMInvertedIndex.java   |  14 +-
 .../impls/LSMInvertedIndexOpContext.java        |  11 +-
 .../ondisk/OnDiskInvertedIndexOpContext.java    |   1 +
 .../dataflow/ExternalRTreeLocalResource.java    |   2 +-
 .../am/lsm/rtree/impls/AbstractLSMRTree.java    |  10 +-
 .../am/lsm/rtree/impls/ExternalRTree.java       |  29 +-
 .../lsm/rtree/impls/ExternalRTreeOpContext.java |   9 +-
 .../storage/am/lsm/rtree/impls/LSMRTree.java    |  11 +-
 .../am/lsm/rtree/impls/LSMRTreeOpContext.java   |  16 +-
 .../impls/LSMRTreeWithAntiMatterTuples.java     |   6 +-
 .../am/lsm/rtree/utils/LSMRTreeUtils.java       |   5 +-
 .../storage/am/lsm/btree/impl/TestLsmBtree.java |  25 +-
 52 files changed, 1514 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 90b9a1e..505ef8f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -76,6 +76,8 @@ import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -121,11 +123,10 @@ public class TestNodeController {
     public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
     public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
     public static final int KB32 = 32768;
-    public static final int PARTITION = 0;
     public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
     public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = TransactionSubsystemProvider.INSTANCE;
     // Mutables
-    private long jobCounter = 0L;
+    private long jobCounter = 100L;
     private final String testConfigFileName;
     private final boolean runHDFS;
 
@@ -155,7 +156,11 @@ public class TestNodeController {
     }
 
     public TxnId getTxnJobId(IHyracksTaskContext ctx) {
-        return new TxnId(ctx.getJobletContext().getJobId().getId());
+        return getTxnJobId(ctx.getJobletContext().getJobId());
+    }
+
+    public TxnId getTxnJobId(JobId jobId) {
+        return new TxnId(jobId.getId());
     }
 
     public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
@@ -177,12 +182,14 @@ public class TestNodeController {
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-        LSMInsertDeleteOperatorNodePushable insertOp = new LSMInsertDeleteOperatorNodePushable(ctx, PARTITION,
-                primaryIndexInfo.primaryIndexInsertFieldsPermutations,
-                recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op,
-                true, indexHelperFactory, modOpCallbackFactory, null);
-        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
-                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
+        LSMInsertDeleteOperatorNodePushable insertOp =
+                new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
+                        primaryIndexInfo.primaryIndexInsertFieldsPermutations,
+                        recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
+                        op, true, indexHelperFactory, modOpCallbackFactory, null);
+        CommitRuntime commitOp =
+                new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes,
+                        false, true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
         return Pair.of(insertOp, commitOp);
@@ -203,7 +210,8 @@ public class TestNodeController {
                 null, null, true, true, indexDataflowHelperFactory, false, false, null,
                 NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
         BTreeSearchOperatorNodePushable searchOp =
-                searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1);
+                searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(),
+                        ctx.getTaskAttemptId().getTaskId().getPartition(), 1);
         emptyTupleOp.setOutputFrameWriter(0, searchOp,
                 primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
         searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
@@ -236,7 +244,7 @@ public class TestNodeController {
 
     public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
             ARecordType metaType, int[] filterFields, IStorageComponentProvider storageComponentProvider,
-            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, int partition)
             throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
@@ -254,8 +262,8 @@ public class TestNodeController {
             IndexBuilderFactory indexBuilderFactory =
                     new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                             primaryIndexInfo.getFileSplitProvider(), resourceFactory, !dataset.isTemp());
-            IHyracksTaskContext ctx = createTestContext(false);
-            IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, 0);
+            IHyracksTaskContext ctx = createTestContext(newJobId(), partition, false);
+            IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition);
             indexBuilder.build();
         } finally {
             mdProvider.getLocks().unlock();
@@ -292,12 +300,12 @@ public class TestNodeController {
         return primaryIndexTypeTraits;
     }
 
-    public IHyracksTaskContext createTestContext(boolean withMessaging) throws HyracksDataException {
+    public IHyracksTaskContext createTestContext(JobId jobId, int partition, boolean withMessaging)
+            throws HyracksDataException {
         IHyracksTaskContext ctx = TestUtils.create(KB32);
         if (withMessaging) {
             TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
         }
-        JobId jobId = newJobId();
         IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
         JobEventListenerFactory factory = new JobEventListenerFactory(new TxnId(jobId.getId()), true);
         Mockito.when(jobletCtx.getJobletEventListenerFactory()).thenReturn(factory);
@@ -306,6 +314,9 @@ public class TestNodeController {
         ctx = Mockito.spy(ctx);
         Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
         Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
+        TaskAttemptId taskId =
+                new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), partition), 0);
+        Mockito.when(ctx.getTaskAttemptId()).thenReturn(taskId);
         return ctx;
     }
 
@@ -377,7 +388,7 @@ public class TestNodeController {
                     (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
             FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset,
                     index.getIndexName(), nodes);
-            fileSplitProvider = new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
+            fileSplitProvider = new ConstantFileSplitProvider(splits);
         }
 
         public Index getIndex() {
@@ -448,12 +459,13 @@ public class TestNodeController {
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-        LSMPrimaryUpsertOperatorNodePushable insertOp = new LSMPrimaryUpsertOperatorNodePushable(ctx, PARTITION,
-                indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations,
-                recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
-                modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1,
-                frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
-                MissingWriterFactory.INSTANCE, hasSecondaries);
+        LSMPrimaryUpsertOperatorNodePushable insertOp =
+                new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
+                        indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations,
+                        recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
+                        modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1,
+                        frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+                        MissingWriterFactory.INSTANCE, hasSecondaries);
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, metaType);
         // fix pk fields
@@ -463,7 +475,7 @@ public class TestNodeController {
             pkFieldsInCommitOp[i] = diff + i;
         }
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
-                false, true, PARTITION, true);
+                false, true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
         insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
         commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
         return Pair.of(insertOp, commitOp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index b09e000..1ac1aa6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -58,6 +58,7 @@ import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.test.CountAnswer;
 import org.apache.hyracks.api.test.FrameWriterTestUtils;
 import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
@@ -133,20 +134,21 @@ public class ComponentRollbackTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
+        int partition = 0;
         dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
                 NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
                         partitioningKeys, null, null, null, false, null, false),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
         PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+                storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition);
         IndexDataflowHelperFactory iHelperFactory =
                 new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-        ctx = nc.createTestContext(false);
-        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+        JobId jobId = nc.newJobId();
+        ctx = nc.createTestContext(jobId, partition, false);
+        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         indexDataflowHelper.open();
         lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
-        nc.newJobId();
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
@@ -158,7 +160,7 @@ public class ComponentRollbackTest {
         indexDataflowHelper.destroy();
     }
 
-    private void allowAllOps(TestLsmBtree lsmBtree) {
+    static void allowAllOps(TestLsmBtree lsmBtree) {
         lsmBtree.addModifyCallback(sem -> sem.release());
         lsmBtree.addFlushCallback(sem -> sem.release());
         lsmBtree.addSearchCallback(sem -> sem.release());
@@ -197,19 +199,17 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             lsmAccessor.deleteComponents(pred);
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -247,12 +247,11 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
 
             // insert again
             nc.newJobId();
@@ -270,14 +269,13 @@ public class ComponentRollbackTest {
             }
             insertOp.close();
             nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             lsmAccessor.deleteComponents(pred);
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -317,12 +315,11 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             // rollback a memory component
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
@@ -331,24 +328,23 @@ public class ComponentRollbackTest {
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // search now and ensure
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
             // rollback the last disk component
             // re-block searches
             lsmBtree.clearSearchCallbacks();
-            Searcher secondSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree,
+            Searcher secondSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree,
                     TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
             // wait till firstSearcher enter the components
             secondSearcher.waitUntilEntered();
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
-            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
             lsmBtree.addSearchCallback(sem -> sem.release());
             lsmBtree.allowSearch(1);
             Assert.assertTrue(secondSearcher.result());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -386,7 +382,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // disable flushes
             lsmBtree.clearFlushCallbacks();
             Flusher firstFlusher = new Flusher(lsmBtree);
@@ -395,7 +391,7 @@ public class ComponentRollbackTest {
             // now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
             Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
             // now that the rollback has completed, we will search
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             //unblock the flush
             lsmBtree.allowFlush(1);
             // ensure rollback completed
@@ -403,7 +399,7 @@ public class ComponentRollbackTest {
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -442,7 +438,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // Now, we will start a full merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -459,7 +455,7 @@ public class ComponentRollbackTest {
             Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
             // rollback is now waiting for the merge to complete
             // we will search
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             //unblock the merge
             lsmBtree.allowMerge(1);
             // ensure rollback completes
@@ -467,7 +463,7 @@ public class ComponentRollbackTest {
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure that we rolled back the merged component
-            searchAndAssertCount(nc, ctx, dataset, storageManager,
+            searchAndAssertCount(nc, 0, dataset, storageManager,
                     TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
             e.printStackTrace();
@@ -506,7 +502,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // disable flushes
             // disable searches
             lsmBtree.clearFlushCallbacks();
@@ -514,7 +510,7 @@ public class ComponentRollbackTest {
             Flusher firstFlusher = new Flusher(lsmBtree);
             dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
             firstFlusher.waitUntilCount(1);
-            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback rollback a memory component
@@ -528,7 +524,7 @@ public class ComponentRollbackTest {
             rollerback.complete();
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure the rollback was no op since it waits for ongoing flushes
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -566,7 +562,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // disable flushes
             // disable searches
             lsmBtree.clearFlushCallbacks();
@@ -574,7 +570,7 @@ public class ComponentRollbackTest {
             dsLifecycleMgr.flushDataset(dataset.getDatasetId(), true);
             firstFlusher.waitUntilCount(1);
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
@@ -589,7 +585,7 @@ public class ComponentRollbackTest {
             rollerback.complete();
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             // search now and ensure
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
         } catch (Throwable e) {
             e.printStackTrace();
             Assert.fail(e.getMessage());
@@ -628,7 +624,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // Now, we will start a merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -643,7 +639,7 @@ public class ComponentRollbackTest {
             merger.waitUntilCount(1);
             // we will block search
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
@@ -657,7 +653,7 @@ public class ComponentRollbackTest {
             Assert.assertTrue(firstSearcher.result());
             rollerback.complete();
             // now that the rollback has completed, we will search
-            searchAndAssertCount(nc, ctx, dataset, storageManager,
+            searchAndAssertCount(nc, 0, dataset, storageManager,
                     TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
@@ -699,7 +695,7 @@ public class ComponentRollbackTest {
             List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
             Assert.assertEquals(9, diskComponents.size());
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             // Now, we will start a merge
             Merger merger = new Merger(lsmBtree);
             ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -714,7 +710,7 @@ public class ComponentRollbackTest {
             merger.waitUntilCount(1);
             // we will block search
             lsmBtree.clearSearchCallbacks();
-            Searcher firstSearcher = new Searcher(nc, ctx, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
             // wait till firstSearcher enter the components
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
@@ -724,11 +720,11 @@ public class ComponentRollbackTest {
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // even though rollback has been called, it is still waiting for the merge to complete
-            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             //unblock the merge
             lsmBtree.allowMerge(1);
             rollerBack.complete();
-            searchAndAssertCount(nc, ctx, dataset, storageManager,
+            searchAndAssertCount(nc, 0, dataset, storageManager,
                     TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
             // ensure current mem component is not modified
             Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
@@ -748,7 +744,6 @@ public class ComponentRollbackTest {
                 @Override
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-                    dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
                     try {
                         lsmAccessor.deleteComponents(predicate);
                     } catch (HyracksDataException e) {
@@ -768,13 +763,13 @@ public class ComponentRollbackTest {
         }
     }
 
-    private class Searcher {
+    static class Searcher {
         private final ExecutorService executor = Executors.newSingleThreadExecutor();
         private Future<Boolean> task;
         private volatile boolean entered = false;
 
-        public Searcher(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
-                StorageComponentProvider storageManager, TestLsmBtree lsmBtree, int numOfRecords) {
+        public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
+                TestLsmBtree lsmBtree, int numOfRecords) {
             lsmBtree.addSearchCallback(sem -> {
                 synchronized (Searcher.this) {
                     entered = true;
@@ -784,7 +779,7 @@ public class ComponentRollbackTest {
             Callable<Boolean> callable = new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws Exception {
-                    searchAndAssertCount(nc, ctx, dataset, storageManager, numOfRecords);
+                    searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords);
                     return true;
                 }
             };
@@ -840,7 +835,7 @@ public class ComponentRollbackTest {
         }
     }
 
-    private class DiskComponentLsnPredicate implements Predicate<ILSMComponent> {
+    private static class DiskComponentLsnPredicate implements Predicate<ILSMComponent> {
         private final long lsn;
 
         public DiskComponentLsnPredicate(long lsn) {
@@ -860,10 +855,11 @@ public class ComponentRollbackTest {
         }
     }
 
-    private void searchAndAssertCount(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+    static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
             StorageComponentProvider storageManager, int numOfRecords)
             throws HyracksDataException, AlgebricksException {
-        nc.newJobId();
+        JobId jobId = nc.newJobId();
+        IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false);
         TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
                 Collections.emptyList(), Collections.emptyList(), false);
         IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1907ed6..aa08c2d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -50,6 +50,7 @@ import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.test.CountAnswer;
 import org.apache.hyracks.api.test.FrameWriterTestUtils;
 import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
@@ -117,9 +118,9 @@ public class LogMarkerTest {
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
                 PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                        storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
-                IHyracksTaskContext ctx = nc.createTestContext(true);
-                nc.newJobId();
+                        storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+                JobId jobId = nc.newJobId();
+                IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
                 ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
new file mode 100644
index 0000000..ec9c2f6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MultiPartitionLSMIndexTest {
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final int TOTAL_NUM_OF_RECORDS = 10000;
+    private static final int RECORDS_PER_COMPONENT = 1000;
+    private static final int DATASET_ID = 101;
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+    private static final int NUM_PARTITIONS = 2;
+    private static TestNodeController nc;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+    private static Dataset dataset;
+    private static ITransactionContext txnCtx;
+    private static TestLsmBtree[] primarylsmBtrees;
+    private static IHyracksTaskContext[] taskCtxs;
+    private static IIndexDataflowHelper[] indexDataflowHelpers;
+    private static LSMInsertDeleteOperatorNodePushable[] insertOps;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        nc = new TestNodeController(null, false);
+        nc.init();
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.out.println("TearDown");
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Before
+    public void createIndex() throws Exception {
+        List<List<String>> partitioningKeys = new ArrayList<>();
+        partitioningKeys.add(Collections.singletonList("key"));
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, false),
+                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        taskCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
+        indexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+        primarylsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
+        insertOps = new LSMInsertDeleteOperatorNodePushable[NUM_PARTITIONS];
+        JobId jobId = nc.newJobId();
+        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        for (int i = 0; i < taskCtxs.length; i++) {
+            taskCtxs[i] = nc.createTestContext(jobId, i, false);
+            PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                    storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, i);
+            IndexDataflowHelperFactory iHelperFactory =
+                    new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+            indexDataflowHelpers[i] = iHelperFactory.create(taskCtxs[i].getJobletContext().getServiceContext(), i);
+            indexDataflowHelpers[i].open();
+            primarylsmBtrees[i] = (TestLsmBtree) indexDataflowHelpers[i].getIndexInstance();
+            indexDataflowHelpers[i].close();
+            insertOps[i] = nc.getInsertPipeline(taskCtxs[i], dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                    KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+        }
+    }
+
+    @After
+    public void destroyIndex() throws Exception {
+        for (IIndexDataflowHelper indexDataflowHelper : indexDataflowHelpers) {
+            indexDataflowHelper.destroy();
+        }
+    }
+
+    @Test
+    public void testFlushOneFullOneEmpty() {
+        try {
+            // allow all operations
+            for (int i = 0; i < NUM_PARTITIONS; i++) {
+                ComponentRollbackTest.allowAllOps(primarylsmBtrees[i]);
+            }
+
+            insertOps[0].open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(taskCtxs[0]);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            int numFlushes = 0;
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every RECORDS_PER_COMPONENT records
+                if (j % RECORDS_PER_COMPONENT == (RECORDS_PER_COMPONENT - 1) && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOps[0], true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                    numFlushes++;
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOps[0]);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOps[0], true);
+            }
+            insertOps[0].close();
+            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+            numFlushes++;
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            // search now and ensure partition 0 has all the records
+            ComponentRollbackTest.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+            // and that partition 1 has no records
+            ComponentRollbackTest.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
+            // and that partition 0 has numFlushes disk components
+            Assert.assertEquals(numFlushes, primarylsmBtrees[0].getDiskComponents().size());
+            // and that partition 1 has no disk components
+            Assert.assertEquals(0, primarylsmBtrees[1].getDiskComponents().size());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
new file mode 100644
index 0000000..77d3795
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.dataflow.ComponentRollbackTest.Searcher;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SearchCursorComponentSwitchTest {
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+    private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final int TOTAL_NUM_OF_RECORDS = 2000;
+    private static final int RECORDS_PER_COMPONENT = 1000;
+    private static final int DATASET_ID = 101;
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+    private static TestNodeController nc;
+    private static TestLsmBtree lsmBtree;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+    private static Dataset dataset;
+    private static IHyracksTaskContext ctx;
+    private static IIndexDataflowHelper indexDataflowHelper;
+    private static ITransactionContext txnCtx;
+    private static LSMInsertDeleteOperatorNodePushable insertOp;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        nc = new TestNodeController(null, false);
+        nc.init();
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        System.out.println("TearDown");
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Before
+    public void createIndex() throws Exception {
+        List<List<String>> partitioningKeys = new ArrayList<>();
+        partitioningKeys.add(Collections.singletonList("key"));
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, false),
+                null, DatasetType.INTERNAL, DATASET_ID, 0);
+        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        JobId jobId = nc.newJobId();
+        ctx = nc.createTestContext(jobId, 0, false);
+        indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+        indexDataflowHelper.open();
+        lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+        indexDataflowHelper.close();
+        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager).getLeft();
+    }
+
+    @After
+    public void destroyIndex() throws Exception {
+        indexDataflowHelper.destroy();
+    }
+
+    void unblockSearch(TestLsmBtree lsmBtree) {
+        lsmBtree.addSearchCallback(sem -> sem.release());
+        lsmBtree.allowSearch(1);
+    }
+
+    @Test
+    public void testCursorSwitchSucceed() {
+        try {
+            // allow all operations
+            ComponentRollbackTest.allowAllOps(lsmBtree);
+            // except search
+            lsmBtree.clearSearchCallbacks();
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            Searcher firstSearcher = null;
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every RECORDS_PER_COMPONENT records
+                if (j % RECORDS_PER_COMPONENT == (RECORDS_PER_COMPONENT - 1) && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            // start the search
+            firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            // wait till firstSearcher enter the components
+            firstSearcher.waitUntilEntered();
+            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            // unblock the search
+            unblockSearch(lsmBtree);
+            // ensure the search got the correct number
+            Assert.assertTrue(firstSearcher.result());
+            // search now and ensure
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCursorSwitchFails() {
+        try {
+            // allow all operations
+            ComponentRollbackTest.allowAllOps(lsmBtree);
+            // except search
+            lsmBtree.clearSearchCallbacks();
+            insertOp.open();
+            TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                    RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+            Searcher firstSearcher = null;
+            for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+                // flush every RECORDS_PER_COMPONENT records
+                if (j % RECORDS_PER_COMPONENT == (RECORDS_PER_COMPONENT - 1) && j + 1 != TOTAL_NUM_OF_RECORDS) {
+                    if (tupleAppender.getTupleCount() > 0) {
+                        tupleAppender.write(insertOp, true);
+                    }
+                    dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+                }
+                ITupleReference tuple = tupleGenerator.next();
+                DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+            }
+            if (tupleAppender.getTupleCount() > 0) {
+                tupleAppender.write(insertOp, true);
+            }
+            insertOp.close();
+            // start the search
+            firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+            // wait till firstSearcher enter the components
+            firstSearcher.waitUntilEntered();
+            dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+            // merge all components
+            ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            List<ILSMDiskComponent> mergedComponents = new ArrayList<>(lsmBtree.getDiskComponents());
+            BlockingIOOperationCallbackWrapper ioCallback =
+                    new BlockingIOOperationCallbackWrapper(lsmBtree.getIOOperationCallback());
+            mergeAccessor.scheduleMerge(ioCallback, mergedComponents);
+            ioCallback.waitForIO();
+            // unblock the search
+            unblockSearch(lsmBtree);
+            // ensure the search got the correct number
+            Assert.assertTrue(firstSearcher.result());
+            // search now and ensure
+            searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private void searchAndAssertCount(TestNodeController nc, IHyracksTaskContext ctx, Dataset dataset,
+            StorageComponentProvider storageManager, int numOfRecords)
+            throws HyracksDataException, AlgebricksException {
+        nc.newJobId();
+        TestTupleCounterFrameWriter countOp =
+                ComponentRollbackTest.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+                        Collections.emptyList(), Collections.emptyList(), false);
+        IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
+        emptyTupleOp.open();
+        emptyTupleOp.close();
+        Assert.assertEquals(numOfRecords, countOp.getCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index e0502de..0c4983a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -21,7 +21,6 @@ package org.apache.asterix.test.dataflow;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -63,6 +62,6 @@ public class TestDataset extends Dataset {
 
     @Override
     public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
-        return new TestLsmBtreeIoOpCallbackFactory(new DatasetLSMComponentIdGeneratorFactory(getDatasetId()));
+        return new TestLsmBtreeIoOpCallbackFactory(getComponentIdGeneratorFactory());
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index fa37c20..44967e3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -18,8 +18,9 @@
  */
 package org.apache.asterix.test.dataflow;
 
-import org.apache.asterix.common.ioopcallbacks.AbstractLSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -28,7 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
 
-public class TestLsmBtreeIoOpCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
+public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -97,7 +98,8 @@ public class TestLsmBtreeIoOpCallbackFactory extends AbstractLSMIndexIOOperation
         }
 
         @Override
-        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
+                throws HyracksDataException {
             super.afterFinalize(opType, newComponent);
             synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
                 if (newComponent != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 1cc24d5..1be1d26 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -54,6 +54,7 @@ import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
@@ -126,9 +127,9 @@ public class CheckpointingTest {
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
                 nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
-                        KEY_INDICATOR_LIST);
-                IHyracksTaskContext ctx = nc.createTestContext(false);
-                nc.newJobId();
+                        KEY_INDICATOR_LIST, 0);
+                JobId jobId = nc.newJobId();
+                IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false);
                 ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 // Prepare insert operation

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
index 5ee0e9f..cfd251b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
@@ -28,6 +28,7 @@ import org.apache.asterix.test.runtime.LangExecutionUtil;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -58,7 +59,8 @@ public class DeallocatableTest {
             final NodeControllerService ncs =
                     (NodeControllerService) nc.getAppRuntimeContext().getServiceContext().getControllerService();
             final TaskAttemptId taId = Mockito.mock(TaskAttemptId.class);
-            final IHyracksTaskContext ctx = nc.createTestContext(true);
+            JobId jobId = nc.newJobId();
+            final IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
             final ConnectorDescriptorId codId = new ConnectorDescriptorId(1);
             final PartitionId pid = new PartitionId(ctx.getJobletContext().getJobId(), codId, 1, 1);
             final ChannelControlBlock ccb = ncs.getNetworkManager()

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index cb83d56..8e8b3e9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -33,7 +33,6 @@ import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -49,6 +48,7 @@ import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
@@ -125,9 +125,9 @@ public class DiskIsFullTest {
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
                 nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
-                        KEY_INDICATOR_LIST);
-                IHyracksTaskContext ctx = nc.createTestContext(false);
-                nc.newJobId();
+                        KEY_INDICATOR_LIST, 0);
+                JobId jobId = nc.newJobId();
+                IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false);
                 // Prepare insert operation
                 LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index 7ced87d..cc6c0f7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.test.storage;
 
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,9 +49,11 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
@@ -106,15 +106,15 @@ public class IndexDropOperatorNodePushableTest {
             Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
                     NoMergePolicyFactory.NAME, null,
                     new InternalDatasetDetails(null, InternalDatasetDetails.PartitioningStrategy.HASH, partitioningKeys,
-                            null, null, null, false, null, false), null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID,
-                    0);
+                            null, null, null, false, null, false),
+                    null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID, 0);
             // create dataset
-            TestNodeController.PrimaryIndexInfo indexInfo =
-                    nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
-                            KEY_INDICATORS_LIST);
+            TestNodeController.PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
+                    META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
             IndexDataflowHelperFactory helperFactory =
                     new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider());
-            IHyracksTaskContext ctx = nc.createTestContext(true);
+            JobId jobId = nc.newJobId();
+            IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
             IIndexDataflowHelper dataflowHelper = helperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
             dropInUse(ctx, helperFactory, dataflowHelper);
             dropInUseWithWait(ctx, helperFactory, dataflowHelper);
@@ -144,21 +144,21 @@ public class IndexDropOperatorNodePushableTest {
             testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
             testExecutor.executeSqlppUpdateOrDdl("CREATE INDEX " + indexName + " on " + datasetName + "(foo)", format);
             final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction();
-            ICcApplicationContext appCtx =
-                    (ICcApplicationContext) ExecutionTestUtil.integrationUtil.getClusterControllerService()
-                            .getApplicationContext();
+            ICcApplicationContext appCtx = (ICcApplicationContext) ExecutionTestUtil.integrationUtil
+                    .getClusterControllerService().getApplicationContext();
             MetadataProvider metadataProver = new MetadataProvider(appCtx, null);
             metadataProver.setMetadataTxnContext(mdTxn);
             final String defaultDv = MetadataBuiltinEntities.DEFAULT_DATAVERSE.getDataverseName();
             final Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxn, defaultDv, datasetName);
             MetadataManager.INSTANCE.commitTransaction(mdTxn);
-            FileSplit[] splits = SplitsAndConstraintsUtil
-                    .getIndexSplits(appCtx.getClusterStateManager(), dataset, indexName, Arrays.asList("asterix_nc1"));
+            FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset,
+                    indexName, Arrays.asList("asterix_nc1"));
             final ConstantFileSplitProvider constantFileSplitProvider =
                     new ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
             IndexDataflowHelperFactory helperFactory =
                     new IndexDataflowHelperFactory(nc.getStorageManager(), constantFileSplitProvider);
-            IHyracksTaskContext ctx = nc.createTestContext(true);
+            JobId jobId = nc.newJobId();
+            IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
             IIndexDataflowHelper dataflowHelper = helperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
             dropInUse(ctx, helperFactory, dataflowHelper);
         } finally {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index e445fe4..c33e2d1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
@@ -88,7 +89,7 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
     }
 
     @Override
-    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
         // The operation was complete and the next I/O operation for the LSM index didn't start yet
         if (opType == LSMIOOperationType.FLUSH && newComponent != null) {
             synchronized (this) {
@@ -100,6 +101,13 @@ public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationC
                 }
                 readIndex = (readIndex + 1) % mutableLastLSNs.length;
             }
+            if (newComponent == EmptyComponent.INSTANCE) {
+                // This component was just deleted, we refresh the component id, when it gets recycled, it will get
+                // the new id from the component id generator.
+                // It is assumed that the component delete caller will ensure that corresponding components in secondary
+                // indexes are deleted as well
+                idGenerator.refresh();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e5a65429/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 2d8eaed..a7468a7 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,13 +51,25 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
 
     @Override
     public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
         try {
             flushIfNotFailed();
         } catch (Exception e) {
+            closeException = HyracksDataException.create(e);
             writer.fail();
-            throw e;
         } finally {
-            writer.close();
+            try {
+                writer.close();
+            } catch (Exception e) {
+                if (closeException == null) {
+                    closeException = HyracksDataException.create(e);
+                } else {
+                    closeException.addSuppressed(e);
+                }
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
         }
     }