You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2018/03/30 02:18:18 UTC

[2/2] asterixdb git commit: [ASTERIXDB-2339] Add a new inverted index merge cursor

[ASTERIXDB-2339] Add a new inverted index merge cursor

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Implement a new inverted index merge cursor which uses two priority queues,
one for tokens and one for keys. For each token, we merge their inverted
lists using the key queue. After that, we fetch the next token and merge
their lists again. This reduces unnecessary token comparision a lot.
- Along this change, created a fast path for inverted index bulkloader.
Based on how the token+key pair is created, there is no need to copy
bulkloaded tuple and check whether it's a new token during merge.

Change-Id: I57d039cd7e08033884529a204bff9acffd96d9bb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2519
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <im...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3036c980
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3036c980
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3036c980

Branch: refs/heads/master
Commit: 3036c98099f6c912af29bced9b2bf71bdfa9026e
Parents: 7dc566b
Author: luochen01 <cl...@uci.edu>
Authored: Sat Mar 24 20:04:02 2018 -0700
Committer: Luo Chen <cl...@uci.edu>
Committed: Thu Mar 29 19:18:01 2018 -0700

----------------------------------------------------------------------
 ...IndexCreationTupleProcessorNodePushable.java |  10 +-
 .../LSMSecondaryUpsertOperatorNodePushable.java |  30 +-
 .../dataflow/common/utils/TupleUtils.java       |  26 ++
 .../am/lsm/btree/impls/ExternalBTree.java       |   4 +-
 .../lsm/btree/impls/ExternalBTreeWithBuddy.java |  10 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java    |   8 +-
 ...AbstractLSMWithBloomFilterDiskComponent.java |   9 +-
 .../am/lsm/common/api/ILSMDiskComponent.java    |   8 +-
 .../common/impls/AbstractLSMDiskComponent.java  |  21 +-
 .../am/lsm/common/impls/EmptyComponent.java     |   7 +-
 .../impls/LSMIndexDiskComponentBulkLoader.java  |   7 +-
 .../lsm/common/impls/LSMIndexSearchCursor.java  |   4 +-
 .../invertedindex/impls/LSMInvertedIndex.java   |  14 +-
 .../impls/LSMInvertedIndexDiskComponent.java    |  15 +-
 .../impls/LSMInvertedIndexMergeCursor.java      | 369 +++++++++++++++++++
 .../ondisk/OnDiskInvertedIndex.java             | 194 ++++++----
 .../OnDiskInvertedIndexRangeSearchCursor.java   |  16 +-
 .../invertedindex/tuples/TokenKeyPairTuple.java |  95 +++++
 .../storage/am/lsm/rtree/impls/LSMRTree.java    |  12 +-
 .../impls/LSMRTreeWithAntiMatterTuples.java     |   6 +-
 .../common/AbstractInvertedIndexTest.java       |   7 +
 .../util/LSMInvertedIndexTestUtils.java         |  26 +-
 22 files changed, 733 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
index 9376d1b..ac7fc89 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorNodePushable.java
@@ -276,7 +276,7 @@ public class LSMSecondaryIndexCreationTupleProcessorNodePushable extends Abstrac
 
     private boolean equalPrimaryKeys(ITupleReference tuple1, ITupleReference tuple2) {
         for (int i = numTagFields + numSecondaryKeys; i < numTagFields + numPrimaryKeys + numSecondaryKeys; i++) {
-            if (!equalField(tuple1, tuple2, i)) {
+            if (!TupleUtils.equalFields(tuple1, tuple2, i)) {
                 return false;
             }
         }
@@ -285,16 +285,10 @@ public class LSMSecondaryIndexCreationTupleProcessorNodePushable extends Abstrac
 
     private boolean equalSecondaryKeys(ITupleReference tuple1, ITupleReference tuple2) {
         for (int i = numTagFields; i < numTagFields + numSecondaryKeys; i++) {
-            if (!equalField(tuple1, tuple2, i)) {
+            if (!TupleUtils.equalFields(tuple1, tuple2, i)) {
                 return false;
             }
         }
         return true;
     }
-
-    private boolean equalField(ITupleReference tuple1, ITupleReference tuple2, int fIdx) {
-        return LSMSecondaryUpsertOperatorNodePushable.equals(tuple1.getFieldData(fIdx), tuple1.getFieldStart(fIdx),
-                tuple1.getFieldLength(fIdx), tuple2.getFieldData(fIdx), tuple2.getFieldStart(fIdx),
-                tuple2.getFieldLength(fIdx));
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index a22e5e7..b928131 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -55,7 +56,7 @@ import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDel
 public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference();
-    private int numberOfFields;
+    private final int numberOfFields;
     private AbstractIndexModificationOperationCallback abstractModCallback;
 
     public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
@@ -74,31 +75,6 @@ public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdate
         abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
     }
 
-    public static boolean equals(byte[] a, int aOffset, int aLength, byte[] b, int bOffset, int bLength) {
-        if (aLength != bLength) {
-            return false;
-        }
-        for (int i = 0; i < aLength; i++) {
-            if (a[aOffset + i] != b[bOffset + i]) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public static boolean equalTuples(PermutingFrameTupleReference t1, PermutingFrameTupleReference t2, int numOfFields)
-            throws HyracksDataException {
-        byte[] t1Data = t1.getFieldData(0);
-        byte[] t2Data = t2.getFieldData(0);
-        for (int i = 0; i < numOfFields; i++) {
-            if (!equals(t1Data, t1.getFieldStart(i), t1.getFieldLength(i), t2Data, t2.getFieldStart(i),
-                    t2.getFieldLength(i))) {
-                return false;
-            }
-        }
-        return true;
-    }
-
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
@@ -117,7 +93,7 @@ public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdate
                 }
                 // At least, one is not null
                 // If they are equal, then we skip
-                if (equalTuples(tuple, prevValueTuple, numberOfFields)) {
+                if (TupleUtils.equalTuples(tuple, prevValueTuple, numberOfFields)) {
                     continue;
                 }
                 if (!isOldValueMissing) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
index 08ed922..49b5309 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TupleUtils.java
@@ -164,4 +164,30 @@ public class TupleUtils {
             tupleBuilder.addField(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
         }
     }
+
+    public static boolean equalTuples(ITupleReference tuple1, ITupleReference tuple2, int numCmpFields) {
+        for (int i = 0; i < numCmpFields; i++) {
+            if (!equalFields(tuple1, tuple2, i)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean equalFields(ITupleReference tuple1, ITupleReference tuple2, int fIdx) {
+        return equalFields(tuple1.getFieldData(fIdx), tuple1.getFieldStart(fIdx), tuple1.getFieldLength(fIdx),
+                tuple2.getFieldData(fIdx), tuple2.getFieldStart(fIdx), tuple2.getFieldLength(fIdx));
+    }
+
+    public static boolean equalFields(byte[] a, int aOffset, int aLength, byte[] b, int bOffset, int bLength) {
+        if (aLength != bLength) {
+            return false;
+        }
+        for (int i = 0; i < aLength; i++) {
+            if (a[aOffset + i] != b[bOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index c0f7571..4c2fc3b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -431,8 +431,8 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex {
                 component = createBulkLoadTarget();
             }
 
-            componentBulkLoader =
-                    component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
+            componentBulkLoader = component.createBulkLoader(LSMIOOperationType.LOAD, fillFactor, verifyInput,
+                    numElementsHint, false, true, true);
         }
 
         // It is expected that the mode was set to insert operation before

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 1ba55f7..62fd850 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -331,7 +331,8 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
                 numElements += ((AbstractLSMWithBloomFilterDiskComponent) mergeOp.getMergingComponents().get(i))
                         .getBloomFilter().getNumElements();
             }
-            componentBulkLoader = mergedComponent.createBulkLoader(1.0f, false, numElements, false, false, false);
+            componentBulkLoader = mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, numElements,
+                    false, false, false);
             try {
                 while (buddyBtreeCursor.hasNext()) {
                     buddyBtreeCursor.next();
@@ -342,7 +343,8 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
                 buddyBtreeCursor.close();
             }
         } else {
-            componentBulkLoader = mergedComponent.createBulkLoader(1.0f, false, 0L, false, false, false);
+            componentBulkLoader =
+                    mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, 0L, false, false, false);
         }
 
         try {
@@ -510,8 +512,8 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd
                 component = createBulkLoadTarget();
             }
 
-            componentBulkLoader =
-                    component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, false);
+            componentBulkLoader = component.createBulkLoader(LSMIOOperationType.LOAD, fillFactor, verifyInput,
+                    numElementsHint, false, true, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 41a11e6..f88947e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -48,6 +48,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -277,7 +278,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             }
             component = createDiskComponent(componentFactory, flushOp.getTarget(), null, flushOp.getBloomFilterTarget(),
                     true);
-            componentBulkLoader = component.createBulkLoader(1.0f, false, numElements, false, false, false);
+            componentBulkLoader =
+                    component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, numElements, false, false, false);
             IIndexCursor scanCursor = accessor.createSearchCursor(false);
             accessor.search(scanCursor, nullPred);
             try {
@@ -336,8 +338,8 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
                     long numElements = getNumberOfElements(mergedComponents);
                     mergedComponent = createDiskComponent(componentFactory, mergeOp.getTarget(), null,
                             mergeOp.getBloomFilterTarget(), true);
-                    componentBulkLoader =
-                            mergedComponent.createBulkLoader(1.0f, false, numElements, false, false, false);
+                    componentBulkLoader = mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false,
+                            numElements, false, false, false);
                     while (cursor.hasNext()) {
                         cursor.next();
                         ITupleReference frameTuple = cursor.getTuple();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index 107190d..c98fa69 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -23,6 +23,7 @@ import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.BloomFilterBulkLoader;
@@ -92,10 +93,10 @@ public abstract class AbstractLSMWithBloomFilterDiskComponent extends AbstractLS
     }
 
     @Override
-    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent)
-            throws HyracksDataException {
-        ChainedLSMDiskComponentBulkLoader chainedBulkLoader = super.createBulkLoader(fillFactor, verifyInput,
+    public ChainedLSMDiskComponentBulkLoader createBulkLoader(LSMIOOperationType opType, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter,
+            boolean cleanupEmptyComponent) throws HyracksDataException {
+        ChainedLSMDiskComponentBulkLoader chainedBulkLoader = super.createBulkLoader(opType, fillFactor, verifyInput,
                 numElementsHint, checkIfEmptyIndex, withFilter, cleanupEmptyComponent);
         if (numElementsHint > 0) {
             chainedBulkLoader.addBulkLoader(createBloomFilterBulkLoader(numElementsHint));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index bd2bb45..1a0305f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.ChainedLSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
@@ -50,6 +51,7 @@ public interface ILSMDiskComponent extends ILSMComponent {
     /**
      * @return LsmIndex of the component
      */
+    @Override
     AbstractLSMIndex getLsmIndex();
 
     /**
@@ -142,6 +144,7 @@ public interface ILSMDiskComponent extends ILSMComponent {
      * Creates a bulkloader pipeline which includes all chained operations, bulkloading individual elements of the
      * component: indexes, LSM filters, Bloom filters, buddy indexes, etc.
      *
+     * @param opType
      * @param fillFactor
      * @param verifyInput
      * @param numElementsHint
@@ -151,6 +154,7 @@ public interface ILSMDiskComponent extends ILSMComponent {
      * @return
      * @throws HyracksDataException
      */
-    ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent) throws HyracksDataException;
+    ChainedLSMDiskComponentBulkLoader createBulkLoader(LSMIOOperationType opType, float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent)
+            throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 26c7b0d..633de6b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -23,6 +23,7 @@ import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -201,17 +202,27 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl
                 getIndex().createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex));
     }
 
+    /**
+     * Allows sub-class extend this method to use specialized bulkloader for merge
+     */
+    protected IChainedComponentBulkLoader createMergeIndexBulkLoader(float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex) throws HyracksDataException {
+        return this.createIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+    }
+
     @Override
-    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent)
-            throws HyracksDataException {
+    public ChainedLSMDiskComponentBulkLoader createBulkLoader(LSMIOOperationType opType, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter,
+            boolean cleanupEmptyComponent) throws HyracksDataException {
         ChainedLSMDiskComponentBulkLoader chainedBulkLoader =
                 new ChainedLSMDiskComponentBulkLoader(this, cleanupEmptyComponent);
         if (withFilter && getLsmIndex().getFilterFields() != null) {
             chainedBulkLoader.addBulkLoader(createFilterBulkLoader());
         }
-        chainedBulkLoader
-                .addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex));
+        IChainedComponentBulkLoader indexBulkloader = opType == LSMIOOperationType.MERGE
+                ? createMergeIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex)
+                : createIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        chainedBulkLoader.addBulkLoader(indexBulkloader);
         return chainedBulkLoader;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index e3ca9f1..466ef24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndex;
 
@@ -144,9 +145,9 @@ public class EmptyComponent implements ILSMDiskComponent {
     }
 
     @Override
-    public ChainedLSMDiskComponentBulkLoader createBulkLoader(float fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter, boolean cleanupEmptyComponent)
-            throws HyracksDataException {
+    public ChainedLSMDiskComponentBulkLoader createBulkLoader(LSMIOOperationType opType, float fillFactor,
+            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex, boolean withFilter,
+            boolean cleanupEmptyComponent) throws HyracksDataException {
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 5e105a4..2ef6169 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -22,13 +22,14 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
 public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
     private final AbstractLSMIndex lsmIndex;
     private final ILSMDiskComponentBulkLoader componentBulkLoader;
-    private ILSMIndexOperationContext opCtx;
+    private final ILSMIndexOperationContext opCtx;
 
     public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext opCtx, float fillFactor,
             boolean verifyInput, long numElementsHint) throws HyracksDataException {
@@ -37,8 +38,8 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
         // Note that by using a flush target file name, we state that the
         // new bulk loaded component is "newer" than any other merged component.
         opCtx.setNewComponent(lsmIndex.createBulkLoadTarget());
-        this.componentBulkLoader =
-                opCtx.getNewComponent().createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
+        this.componentBulkLoader = opCtx.getNewComponent().createBulkLoader(LSMIOOperationType.LOAD, fillFactor,
+                verifyInput, numElementsHint, false, true, true);
     }
 
     public ILSMDiskComponent getComponent() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index fb1fa63..12caec4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -246,7 +246,7 @@ public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implement
         }
     }
 
-    public class PriorityQueueElement {
+    public static class PriorityQueueElement {
         private ITupleReference tuple;
         private final int cursorIndex;
 
@@ -268,7 +268,7 @@ public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implement
         }
     }
 
-    public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
+    public static class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
 
         protected MultiComparator cmp;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index e46c24a..0fae1ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -45,6 +45,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -59,7 +60,6 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFilterManager;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
 import org.apache.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
 import org.apache.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndexAccessor;
@@ -296,7 +296,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex
         }
 
         ILSMDiskComponentBulkLoader componentBulkLoader =
-                component.createBulkLoader(1.0f, false, numBTreeTuples, false, false, false);
+                component.createBulkLoader(LSMIOOperationType.FLUSH, 1.0f, false, numBTreeTuples, false, false, false);
 
         // Create a scan cursor on the deleted keys BTree underlying the in-memory inverted index.
         IIndexCursor deletedKeysScanCursor = deletedKeysBTreeAccessor.createSearchCursor(false);
@@ -349,7 +349,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex
         LSMInvertedIndexMergeOperation mergeOp = (LSMInvertedIndexMergeOperation) operation;
         RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
         IIndexCursor cursor = mergeOp.getCursor();
-        ILSMIndexOperationContext opCtx = ((LSMIndexSearchCursor) cursor).getOpCtx();
+        ILSMIndexOperationContext opCtx = ((LSMInvertedIndexMergeCursor) cursor).getOpCtx();
         // Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
         // Create an inverted index instance.
         ILSMDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(),
@@ -368,13 +368,15 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex
                     numElements += ((LSMInvertedIndexDiskComponent) mergeOp.getMergingComponents().get(i))
                             .getBloomFilter().getNumElements();
                 }
-                componentBulkLoader = component.createBulkLoader(1.0f, false, numElements, false, false, false);
+                componentBulkLoader = component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, numElements,
+                        false, false, false);
                 loadDeleteTuples(opCtx, btreeCursor, mergePred, componentBulkLoader);
             } finally {
                 btreeCursor.destroy();
             }
         } else {
-            componentBulkLoader = component.createBulkLoader(1.0f, false, 0L, false, false, false);
+            componentBulkLoader =
+                    component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, 0L, false, false, false);
         }
         search(opCtx, cursor, mergePred);
         try {
@@ -495,7 +497,7 @@ public class LSMInvertedIndex extends AbstractLSMIndex implements IInvertedIndex
     protected ILSMIOOperation createMergeOperation(AbstractLSMIndexOperationContext opCtx,
             LSMComponentFileReferences mergeFileRefs, ILSMIOOperationCallback callback) throws HyracksDataException {
         ILSMIndexAccessor accessor = new LSMInvertedIndexAccessor(getHarness(), opCtx);
-        IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(opCtx);
+        IIndexCursor cursor = new LSMInvertedIndexMergeCursor(opCtx);
         return new LSMInvertedIndexMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(),
                 mergeFileRefs.getDeleteIndexFileReference(), mergeFileRefs.getBloomFilterFileReference(), callback,
                 fileManager.getBaseDir().getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 279a518..b030e83 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -25,11 +25,14 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBuddyDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexWithBuddyBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 public class LSMInvertedIndexDiskComponent extends AbstractLSMWithBuddyDiskComponent {
@@ -109,4 +112,14 @@ public class LSMInvertedIndexDiskComponent extends AbstractLSMWithBuddyDiskCompo
         // Flush deleted keys BTree.
         ComponentUtils.markAsValid(getBuddyIndex(), persist);
     }
+
+    @Override
+    protected IChainedComponentBulkLoader createMergeIndexBulkLoader(float fillFactor, boolean verifyInput,
+            long numElementsHint, boolean checkIfEmptyIndex) throws HyracksDataException {
+        IIndexBulkLoader indexBulkLoader =
+                invIndex.createMergeBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        IIndexBulkLoader buddyBulkLoader =
+                getBuddyIndex().createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+        return new IndexWithBuddyBulkLoader(indexBulkLoader, buddyBulkLoader);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
new file mode 100644
index 0000000..c80455d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeCursor.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor.PriorityQueueComparator;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor.PriorityQueueElement;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexRangeSearchCursor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
+import org.apache.hyracks.storage.common.EnforcedIndexCursor;
+import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+/**
+ * This cursor is specially designed and optimized for merging inverted index.
+ * For simplicity, it assumes all components are disk components, and the cursor is not reused.
+ *
+ */
+public class LSMInvertedIndexMergeCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
+    protected final LSMInvertedIndexOpContext opCtx;
+    protected PriorityQueueElement outputTokenElement;
+    protected OnDiskInvertedIndexRangeSearchCursor[] rangeCursors;
+    protected PriorityQueueElement[] tokenQueueElements;
+    protected PriorityQueue<PriorityQueueElement> tokenQueue;
+    protected PriorityQueueComparator tokenQueueCmp;
+
+    protected PriorityQueueElement outputKeyElement;
+    protected PriorityQueueElement[] keyQueueElements;
+    protected PriorityQueue<PriorityQueueElement> keyQueue;
+    protected PriorityQueueComparator keyQueueCmp;
+
+    protected boolean needPushElementIntoKeyQueue;
+
+    protected ILSMHarness lsmHarness;
+
+    protected MultiComparator tokenCmp;
+    protected MultiComparator keyCmp;
+
+    protected List<ILSMComponent> operationalComponents;
+
+    // Assuming the cursor for all deleted-keys indexes are of the same type.
+    protected IIndexCursor[] deletedKeysBTreeCursors;
+    protected BloomFilter[] bloomFilters;
+    protected final long[] hashes = BloomFilter.createHashArray();
+    protected ArrayList<IIndexAccessor> deletedKeysBTreeAccessors;
+    protected RangePredicate deletedKeyBTreeSearchPred;
+
+    protected final TokenKeyPairTuple outputTuple;
+
+    public LSMInvertedIndexMergeCursor(ILSMIndexOperationContext opCtx) {
+        this.opCtx = (LSMInvertedIndexOpContext) opCtx;
+        outputTokenElement = null;
+        outputKeyElement = null;
+        needPushElementIntoKeyQueue = false;
+
+        IInvertedIndex invertedIndex = (IInvertedIndex) this.opCtx.getIndex();
+        this.outputTuple = new TokenKeyPairTuple(invertedIndex.getTokenTypeTraits().length,
+                invertedIndex.getInvListTypeTraits().length);
+
+        this.tokenCmp = MultiComparator.create(invertedIndex.getTokenCmpFactories());
+        this.keyCmp = MultiComparator.create(invertedIndex.getInvListCmpFactories());
+        this.tokenQueueCmp = new PriorityQueueComparator(tokenCmp);
+        this.keyQueueCmp = new PriorityQueueComparator(keyCmp);
+    }
+
+    public LSMInvertedIndexOpContext getOpCtx() {
+        return opCtx;
+    }
+
+    @Override
+    public void doOpen(ICursorInitialState initState, ISearchPredicate searchPred) throws HyracksDataException {
+        LSMInvertedIndexRangeSearchCursorInitialState lsmInitState =
+                (LSMInvertedIndexRangeSearchCursorInitialState) initState;
+        int numComponents = lsmInitState.getNumComponents();
+        rangeCursors = new OnDiskInvertedIndexRangeSearchCursor[numComponents];
+        for (int i = 0; i < numComponents; i++) {
+            IInvertedIndexAccessor invIndexAccessor = (IInvertedIndexAccessor) lsmInitState.getIndexAccessors().get(i);
+            rangeCursors[i] = (OnDiskInvertedIndexRangeSearchCursor) invIndexAccessor.createRangeSearchCursor();
+            invIndexAccessor.rangeSearch(rangeCursors[i], lsmInitState.getSearchPredicate());
+        }
+        lsmHarness = lsmInitState.getLSMHarness();
+        operationalComponents = lsmInitState.getOperationalComponents();
+        deletedKeysBTreeAccessors = lsmInitState.getDeletedKeysBTreeAccessors();
+        bloomFilters = new BloomFilter[deletedKeysBTreeAccessors.size()];
+        if (!deletedKeysBTreeAccessors.isEmpty()) {
+            deletedKeysBTreeCursors = new IIndexCursor[deletedKeysBTreeAccessors.size()];
+            for (int i = 0; i < operationalComponents.size(); i++) {
+                ILSMComponent component = operationalComponents.get(i);
+                deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor(false);
+                if (component.getType() == LSMComponentType.MEMORY) {
+                    // No need for a bloom filter for the in-memory BTree.
+                    bloomFilters[i] = null;
+                } else {
+                    bloomFilters[i] = ((LSMInvertedIndexDiskComponent) component).getBloomFilter();
+                }
+            }
+        }
+        deletedKeyBTreeSearchPred = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
+        initPriorityQueues();
+    }
+
+    private void initPriorityQueues() throws HyracksDataException {
+        int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
+        tokenQueue = new PriorityQueue<>(pqInitSize, tokenQueueCmp);
+        keyQueue = new PriorityQueue<>(pqInitSize, keyQueueCmp);
+        tokenQueueElements = new PriorityQueueElement[pqInitSize];
+        keyQueueElements = new PriorityQueueElement[pqInitSize];
+        for (int i = 0; i < pqInitSize; i++) {
+            tokenQueueElements[i] = new PriorityQueueElement(i);
+            keyQueueElements[i] = new PriorityQueueElement(i);
+        }
+        for (int i = 0; i < rangeCursors.length; i++) {
+            if (rangeCursors[i].hasNext()) {
+                rangeCursors[i].next();
+                tokenQueueElements[i].reset(rangeCursors[i].getTuple());
+                tokenQueue.offer(tokenQueueElements[i]);
+            } else {
+                rangeCursors[i].close();
+            }
+        }
+        searchNextToken();
+    }
+
+    private void searchNextToken() throws HyracksDataException {
+        if (tokenQueue.isEmpty()) {
+            return;
+        }
+        if (!keyQueue.isEmpty()) {
+            throw new IllegalStateException("Illegal call of initializing key queue");
+        }
+        outputTokenElement = tokenQueue.poll();
+        initPushIntoKeyQueue(outputTokenElement);
+        ITupleReference tokenTuple = getTokenTuple(outputTokenElement);
+        outputTuple.setTokenTuple(tokenTuple);
+        // pop all same tokens
+        while (!tokenQueue.isEmpty()) {
+            PriorityQueueElement tokenElement = tokenQueue.peek();
+            if (TupleUtils.equalTuples(tokenTuple, getTokenTuple(tokenElement), tokenCmp.getKeyFieldCount())) {
+                initPushIntoKeyQueue(tokenElement);
+                tokenQueue.poll();
+            } else {
+                break;
+            }
+        }
+    }
+
+    private ITupleReference getKeyTuple(PriorityQueueElement tokenElement) {
+        return ((TokenKeyPairTuple) tokenElement.getTuple()).getKeyTuple();
+    }
+
+    private ITupleReference getTokenTuple(PriorityQueueElement tokenElement) {
+        return ((TokenKeyPairTuple) tokenElement.getTuple()).getTokenTuple();
+    }
+
+    private void initPushIntoKeyQueue(PriorityQueueElement tokenElement) {
+        PriorityQueueElement keyElement = keyQueueElements[tokenElement.getCursorIndex()];
+        keyElement.reset(getKeyTuple(tokenElement));
+        keyQueue.add(keyElement);
+    }
+
+    private void pushIntoKeyQueueAndReplace(PriorityQueueElement keyElement) throws HyracksDataException {
+        int cursorIndex = keyElement.getCursorIndex();
+        if (rangeCursors[cursorIndex].hasNext()) {
+            rangeCursors[cursorIndex].next();
+            TokenKeyPairTuple tuple = (TokenKeyPairTuple) rangeCursors[cursorIndex].getTuple();
+            if (tuple.isNewToken()) {
+                // if this element is a new token, then the current inverted list has exuasted
+                PriorityQueueElement tokenElement = tokenQueueElements[cursorIndex];
+                tokenElement.reset(tuple);
+                tokenQueue.offer(tokenElement);
+            } else {
+                keyElement.reset(tuple.getKeyTuple());
+                keyQueue.offer(keyElement);
+            }
+        } else {
+            rangeCursors[cursorIndex].close();
+        }
+    }
+
+    @Override
+    public boolean doHasNext() throws HyracksDataException {
+        checkPriorityQueue();
+        return !keyQueue.isEmpty();
+    }
+
+    @Override
+    public void doNext() throws HyracksDataException {
+        outputKeyElement = keyQueue.poll();
+        outputTuple.setKeyTuple(outputKeyElement.getTuple());
+        needPushElementIntoKeyQueue = true;
+    }
+
+    @Override
+    public ITupleReference doGetTuple() {
+        return outputTuple;
+    }
+
+    protected void checkPriorityQueue() throws HyracksDataException {
+        checkKeyQueue();
+        if (keyQueue.isEmpty()) {
+            // if key queue is empty, we search the next token and check again
+            searchNextToken();
+            checkKeyQueue();
+        }
+    }
+
+    protected void checkKeyQueue() throws HyracksDataException {
+        while (!keyQueue.isEmpty() || needPushElementIntoKeyQueue) {
+            if (!keyQueue.isEmpty()) {
+                PriorityQueueElement checkElement = keyQueue.peek();
+                // If there is no previous tuple or the previous tuple can be ignored
+                if (outputKeyElement == null) {
+                    if (isDeleted(checkElement)) {
+                        // If the key has been deleted then pop it and set needPush to true.
+                        // We cannot push immediately because the tuple may be
+                        // modified if hasNext() is called
+                        outputKeyElement = checkElement;
+                        needPushElementIntoKeyQueue = true;
+                    } else {
+                        // we have found the next record
+                        return;
+                    }
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (keyCmp.compare(outputKeyElement.getTuple(), checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
+
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = keyQueue.poll();
+                        pushIntoKeyQueueAndReplace(e);
+                    } else {
+                        // If the previous tuple and the head tuple are different
+                        // the info of previous tuple is useless
+                        if (needPushElementIntoKeyQueue) {
+                            pushIntoKeyQueueAndReplace(outputKeyElement);
+                            needPushElementIntoKeyQueue = false;
+                        }
+                        outputKeyElement = null;
+                    }
+                }
+            } else {
+                // the priority queue is empty and needPush
+                // NOSONAR: outputKeyElement is not null when needPushElementIntoKeyQueue = true
+                pushIntoKeyQueueAndReplace(outputKeyElement);
+                needPushElementIntoKeyQueue = false;
+                outputKeyElement = null;
+            }
+        }
+    }
+
+    /**
+     * Check deleted-keys BTrees whether they contain the key in the checkElement's tuple.
+     */
+    protected boolean isDeleted(PriorityQueueElement keyElement) throws HyracksDataException {
+        ITupleReference keyTuple = keyElement.getTuple();
+        int end = keyElement.getCursorIndex();
+        for (int i = 0; i < end; i++) {
+            if (bloomFilters[i] != null && !bloomFilters[i].contains(keyTuple, hashes)) {
+                continue;
+            }
+            deletedKeysBTreeCursors[i].close();
+            deletedKeysBTreeAccessors.get(i).search(deletedKeysBTreeCursors[i], deletedKeyBTreeSearchPred);
+            try {
+                if (deletedKeysBTreeCursors[i].hasNext()) {
+                    return true;
+                }
+            } finally {
+                deletedKeysBTreeCursors[i].close();
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void doClose() throws HyracksDataException {
+        outputTokenElement = null;
+        outputKeyElement = null;
+        needPushElementIntoKeyQueue = false;
+        try {
+            if (rangeCursors != null) {
+                for (int i = 0; i < rangeCursors.length; i++) {
+                    rangeCursors[i].close();
+                }
+            }
+        } finally {
+            if (lsmHarness != null) {
+                lsmHarness.endSearch(opCtx);
+            }
+        }
+    }
+
+    @Override
+    public void doDestroy() throws HyracksDataException {
+        try {
+            if (tokenQueue != null) {
+                tokenQueue.clear();
+            }
+            if (keyQueue != null) {
+                keyQueue.clear();
+            }
+            if (rangeCursors != null) {
+                for (int i = 0; i < rangeCursors.length; i++) {
+                    if (rangeCursors[i] != null) {
+                        rangeCursors[i].destroy();
+                    }
+                }
+                rangeCursors = null;
+            }
+        } finally {
+            if (lsmHarness != null) {
+                lsmHarness.endSearch(opCtx);
+            }
+        }
+    }
+
+    @Override
+    public ITupleReference getFilterMinTuple() {
+        return null;
+    }
+
+    @Override
+    public ITupleReference getFilterMaxTuple() {
+        return null;
+    }
+
+    @Override
+    public boolean getSearchOperationCallbackProceedResult() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 14ebe46..c3c9c21 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -51,6 +51,7 @@ import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexSearchCursorInitialState;
 import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
 import org.apache.hyracks.storage.am.lsm.invertedindex.search.TOccurrenceSearcher;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -230,30 +231,28 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
         listCursor.open(initState, null);
     }
 
-    public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
-        private final ArrayTupleBuilder btreeTupleBuilder;
-        private final ArrayTupleReference btreeTupleReference;
-        private final IIndexBulkLoader btreeBulkloader;
+    public abstract class AbstractOnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
+        protected final ArrayTupleBuilder btreeTupleBuilder;
+        protected final ArrayTupleReference btreeTupleReference;
+        protected final IIndexBulkLoader btreeBulkloader;
 
-        private int currentInvListStartPageId;
-        private int currentInvListStartOffset;
-        private final ArrayTupleBuilder lastTupleBuilder;
-        private final ArrayTupleReference lastTuple;
+        protected int currentInvListStartPageId;
+        protected int currentInvListStartOffset;
+        protected final ArrayTupleBuilder lastTupleBuilder;
+        protected final ArrayTupleReference lastTuple;
 
-        private int currentPageId;
-        private ICachedPage currentPage;
-        private final MultiComparator tokenCmp;
-        private final MultiComparator invListCmp;
+        protected int currentPageId;
+        protected ICachedPage currentPage;
+        protected final MultiComparator invListCmp;
 
-        private final boolean verifyInput;
-        private final MultiComparator allCmp;
+        protected final boolean verifyInput;
+        protected final MultiComparator allCmp;
 
-        private final IFIFOPageQueue queue;
+        protected final IFIFOPageQueue queue;
 
-        public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
+        public AbstractOnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
                 boolean checkIfEmptyIndex, int startPageId) throws HyracksDataException {
             this.verifyInput = verifyInput;
-            this.tokenCmp = MultiComparator.create(btree.getComparatorFactories());
             this.invListCmp = MultiComparator.create(invListCmpFactories);
             if (verifyInput) {
                 allCmp = MultiComparator.create(btree.getComparatorFactories(), invListCmpFactories);
@@ -272,22 +271,15 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
             queue = bufferCache.createFIFOQueue();
         }
 
-        public void pinNextPage() throws HyracksDataException {
+        protected void pinNextPage() throws HyracksDataException {
             queue.put(currentPage);
             currentPageId++;
             currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
         }
 
-        private void createAndInsertBTreeTuple() throws HyracksDataException {
+        protected void insertBTreeTuple() throws HyracksDataException {
             // Build tuple.
-            btreeTupleBuilder.reset();
             DataOutput output = btreeTupleBuilder.getDataOutput();
-            // Add key fields.
-            lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), lastTupleBuilder.getByteArray());
-            for (int i = 0; i < numTokenFields; i++) {
-                btreeTupleBuilder.addField(lastTuple.getFieldData(i), lastTuple.getFieldStart(i),
-                        lastTuple.getFieldLength(i));
-            }
             // Add inverted-list 'pointer' value fields.
             try {
                 output.writeInt(currentInvListStartPageId);
@@ -304,77 +296,59 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
             // Reset tuple reference and add it into the BTree load.
             btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
             btreeBulkloader.add(btreeTupleReference);
+            btreeTupleBuilder.reset();
         }
 
-        /**
-         * Assumptions:
-         * The first btree.getMultiComparator().getKeyFieldCount() fields in tuple
-         * are btree keys (e.g., a string token).
-         * The next invListCmp.getKeyFieldCount() fields in tuple are keys of the
-         * inverted list (e.g., primary key).
-         * Key fields of inverted list are fixed size.
-         */
-        @Override
-        public void add(ITupleReference tuple) throws HyracksDataException {
-            boolean firstElement = lastTupleBuilder.getSize() == 0;
-            boolean startNewList = firstElement;
-            if (!firstElement) {
-                // If the current and the last token don't match, we start a new list.
-                lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), lastTupleBuilder.getByteArray());
-                startNewList = tokenCmp.compare(tuple, lastTuple) != 0;
-            }
-            if (startNewList) {
-                if (!firstElement) {
-                    // Create entry in btree for last inverted list.
-                    createAndInsertBTreeTuple();
-                }
-                if (!invListBuilder.startNewList(tuple, numTokenFields)) {
-                    pinNextPage();
-                    invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
-                    if (!invListBuilder.startNewList(tuple, numTokenFields)) {
-                        throw new IllegalStateException("Failed to create first inverted list.");
-                    }
-                }
-                currentInvListStartPageId = currentPageId;
-                currentInvListStartOffset = invListBuilder.getPos();
-            } else {
-                if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0) {
-                    // Duplicate inverted-list element.
-                    return;
+        protected void startNewList(ITupleReference tokenTuple) throws HyracksDataException {
+            if (!invListBuilder.startNewList(tokenTuple, numTokenFields)) {
+                pinNextPage();
+                invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
+                if (!invListBuilder.startNewList(tokenTuple, numTokenFields)) {
+                    throw new IllegalStateException("Failed to create first inverted list.");
                 }
             }
+            currentInvListStartPageId = currentPageId;
+            currentInvListStartOffset = invListBuilder.getPos();
+        }
 
-            // Append to current inverted list.
-            if (!invListBuilder.appendElement(tuple, numTokenFields, numInvListKeys)) {
+        protected void appendInvertedList(ITupleReference keyTuple, int startField) throws HyracksDataException {
+            if (!invListBuilder.appendElement(keyTuple, startField, numInvListKeys)) {
                 pinNextPage();
                 invListBuilder.setTargetBuffer(currentPage.getBuffer().array(), 0);
-                if (!invListBuilder.appendElement(tuple, numTokenFields, numInvListKeys)) {
+                if (!invListBuilder.appendElement(keyTuple, startField, numInvListKeys)) {
                     throw new IllegalStateException(
                             "Failed to append element to inverted list after switching to a new page.");
                 }
             }
+        }
 
-            if (verifyInput && lastTupleBuilder.getSize() != 0) {
-                if (allCmp.compare(tuple, lastTuple) <= 0) {
-                    throw new HyracksDataException(
-                            "Input stream given to OnDiskInvertedIndex bulk load is not sorted.");
-                }
+        protected void verifyTuple(ITupleReference tuple) throws HyracksDataException {
+            if (lastTupleBuilder.getSize() > 0 && allCmp.compare(tuple, lastTuple) <= 0) {
+                HyracksDataException.create(ErrorCode.UNSORTED_LOAD_INPUT);
             }
+        }
 
-            // Remember last tuple by creating a copy.
-            // TODO: This portion can be optimized by only copying the token when it changes, and using the last appended inverted-list element as a reference.
+        protected void saveLastTuple(ITupleReference tuple) throws HyracksDataException {
             lastTupleBuilder.reset();
             for (int i = 0; i < tuple.getFieldCount(); i++) {
                 lastTupleBuilder.addField(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
             }
+            lastTuple.reset(lastTupleBuilder.getFieldEndOffsets(), lastTupleBuilder.getByteArray());
+        }
+
+        protected void copyTokenToBTreeTuple(ITupleReference tokenTuple) throws HyracksDataException {
+            for (int i = 0; i < numTokenFields; i++) {
+                btreeTupleBuilder.addField(tokenTuple.getFieldData(i), tokenTuple.getFieldStart(i),
+                        tokenTuple.getFieldLength(i));
+            }
         }
 
         @Override
         public void end() throws HyracksDataException {
-            // The last tuple builder is empty if add() was never called.
-            if (lastTupleBuilder.getSize() != 0) {
-                createAndInsertBTreeTuple();
+            if (btreeTupleBuilder.getSize() != 0) {
+                insertBTreeTuple();
             }
+
             btreeBulkloader.end();
 
             if (currentPage != null) {
@@ -392,6 +366,72 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
         }
     }
 
+    public class OnDiskInvertedIndexMergeBulkLoader extends AbstractOnDiskInvertedIndexBulkLoader {
+
+        public OnDiskInvertedIndexMergeBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
+                boolean checkIfEmptyIndex, int startPageId) throws HyracksDataException {
+            super(btreeFillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, startPageId);
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws HyracksDataException {
+            TokenKeyPairTuple pairTuple = (TokenKeyPairTuple) tuple;
+            ITupleReference tokenTuple = pairTuple.getTokenTuple();
+            ITupleReference keyTuple = pairTuple.getKeyTuple();
+            boolean startNewList = pairTuple.isNewToken();
+            if (startNewList) {
+                if (btreeTupleBuilder.getSize() > 0) {
+                    insertBTreeTuple();
+                }
+                startNewList(tokenTuple);
+                copyTokenToBTreeTuple(tokenTuple);
+            }
+            appendInvertedList(keyTuple, 0);
+            if (verifyInput) {
+                verifyTuple(tuple);
+                saveLastTuple(tuple);
+            }
+        }
+    }
+
+    public class OnDiskInvertedIndexBulkLoader extends AbstractOnDiskInvertedIndexBulkLoader {
+
+        public OnDiskInvertedIndexBulkLoader(float btreeFillFactor, boolean verifyInput, long numElementsHint,
+                boolean checkIfEmptyIndex, int startPageId) throws HyracksDataException {
+            super(btreeFillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, startPageId);
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws HyracksDataException {
+            boolean firstElement = btreeTupleBuilder.getSize() == 0;
+            boolean startNewList = firstElement;
+            if (!firstElement) {
+                // If the current and the last token don't match, we start a new list.
+                startNewList = !TupleUtils.equalTuples(tuple, lastTuple, numTokenFields);
+            }
+            if (startNewList) {
+                if (!firstElement) {
+                    // Create entry in btree for last inverted list.
+                    insertBTreeTuple();
+                }
+                startNewList(tuple);
+                copyTokenToBTreeTuple(tuple);
+            } else {
+                if (invListCmp.compare(tuple, lastTuple, numTokenFields) == 0) {
+                    // Duplicate inverted-list element.
+                    return;
+                }
+            }
+            appendInvertedList(tuple, numTokenFields);
+            if (verifyInput) {
+                verifyTuple(tuple);
+            }
+
+            saveLastTuple(tuple);
+        }
+
+    }
+
     @Override
     public IBufferCache getBufferCache() {
         return bufferCache;
@@ -518,6 +558,12 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
                 rootPageId);
     }
 
+    public IIndexBulkLoader createMergeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex) throws HyracksDataException {
+        return new OnDiskInvertedIndexMergeBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                rootPageId);
+    }
+
     @Override
     public void validate() throws HyracksDataException {
         btree.validate();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
index 9d99c9e..11b483e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
@@ -24,9 +24,9 @@ import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.tuples.TokenKeyPairTuple;
 import org.apache.hyracks.storage.common.EnforcedIndexCursor;
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.IIndexAccessor;
@@ -49,7 +49,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
     private RangePredicate btreePred;
 
     private final PermutingTupleReference tokenTuple;
-    private final ConcatenatingTupleReference concatTuple;
+    private final TokenKeyPairTuple resultTuple;
 
     public OnDiskInvertedIndexRangeSearchCursor(OnDiskInvertedIndex invIndex, IIndexOperationContext opCtx)
             throws HyracksDataException {
@@ -64,7 +64,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
         }
         tokenTuple = new PermutingTupleReference(fieldPermutation);
         btreeCursor = btreeAccessor.createSearchCursor(false);
-        concatTuple = new ConcatenatingTupleReference(2);
+        resultTuple = new TokenKeyPairTuple(invIndex.getTokenTypeTraits().length, btree.getCmpFactories().length);
         invListRangeSearchCursor = invIndex.createInvertedListRangeSearchCursor();
         isInvListCursorOpen = false;
     }
@@ -95,10 +95,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
     @Override
     public void doNext() throws HyracksDataException {
         invListRangeSearchCursor.next();
-        if (concatTuple.hasMaxTuples()) {
-            concatTuple.removeLastTuple();
-        }
-        concatTuple.addTuple(invListRangeSearchCursor.getTuple());
+        resultTuple.setKeyTuple(invListRangeSearchCursor.getTuple());
     }
 
     @Override
@@ -123,7 +120,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
 
     @Override
     public ITupleReference doGetTuple() {
-        return concatTuple;
+        return resultTuple;
     }
 
     // Opens an inverted-list-scan cursor for the given tuple.
@@ -135,8 +132,7 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
                     (OnDiskInvertedIndexOpContext) opCtx);
             invListRangeSearchCursor.prepareLoadPages();
             invListRangeSearchCursor.loadPages();
-            concatTuple.reset();
-            concatTuple.addTuple(tokenTuple);
+            resultTuple.setTokenTuple(tokenTuple);
             isInvListCursorOpen = true;
         } else {
             isInvListCursorOpen = false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java
new file mode 100644
index 0000000..102fe96
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/tuples/TokenKeyPairTuple.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.invertedindex.tuples;
+
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TokenKeyPairTuple implements ITupleReference {
+
+    private ITupleReference tokenTuple;
+    private ITupleReference keyTuple;
+
+    private final int tokenFieldCount;
+    private final int keyFieldCount;
+
+    private boolean newToken;
+
+    public TokenKeyPairTuple(int tokenFieldCount, int keyFieldCount) {
+        this.tokenFieldCount = tokenFieldCount;
+        this.keyFieldCount = keyFieldCount;
+
+    }
+
+    public void setTokenTuple(ITupleReference token) {
+        this.tokenTuple = token;
+        this.keyTuple = null;
+    }
+
+    public void setKeyTuple(ITupleReference key) {
+        newToken = this.keyTuple == null;
+        this.keyTuple = key;
+    }
+
+    public ITupleReference getTokenTuple() {
+        return tokenTuple;
+    }
+
+    public ITupleReference getKeyTuple() {
+        return keyTuple;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return tokenFieldCount + keyFieldCount;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        ITupleReference tuple = getTuple(fIdx);
+        int fieldIndex = getFieldIndex(fIdx);
+        return tuple.getFieldData(fieldIndex);
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        ITupleReference tuple = getTuple(fIdx);
+        int fieldIndex = getFieldIndex(fIdx);
+        return tuple.getFieldStart(fieldIndex);
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        ITupleReference tuple = getTuple(fIdx);
+        int fieldIndex = getFieldIndex(fIdx);
+        return tuple.getFieldLength(fieldIndex);
+    }
+
+    private ITupleReference getTuple(int fIdx) {
+        return fIdx < tokenFieldCount ? tokenTuple : keyTuple;
+    }
+
+    private int getFieldIndex(int fIdx) {
+        return fIdx < tokenFieldCount ? fIdx : fIdx - tokenFieldCount;
+    }
+
+    public boolean isNewToken() {
+        return newToken;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 4510618..fae6e1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -45,6 +45,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -134,8 +135,8 @@ public class LSMRTree extends AbstractLSMRTree {
                 rTreeTupleSorter.sort();
                 component = createDiskComponent(componentFactory, flushOp.getTarget(), flushOp.getBTreeTarget(),
                         flushOp.getBloomFilterTarget(), true);
-                componentBulkLoader =
-                        component.createBulkLoader(1.0f, false, numBTreeTuples.longValue(), false, false, false);
+                componentBulkLoader = component.createBulkLoader(LSMIOOperationType.FLUSH, 1.0f, false,
+                        numBTreeTuples.longValue(), false, false, false);
                 flushLoadRTree(isEmpty, rTreeTupleSorter, componentBulkLoader);
                 // scan the memory BTree and bulk load delete tuples
                 flushLoadBtree(memBTreeAccessor, componentBulkLoader, btreeNullPredicate);
@@ -331,12 +332,13 @@ public class LSMRTree extends AbstractLSMRTree {
                         numElements += ((LSMRTreeDiskComponent) mergeOp.getMergingComponents().get(i)).getBloomFilter()
                                 .getNumElements();
                     }
-                    componentBulkLoader =
-                            mergedComponent.createBulkLoader(1.0f, false, numElements, false, false, false);
+                    componentBulkLoader = mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false,
+                            numElements, false, false, false);
                     mergeLoadBTree(opCtx, rtreeSearchPred, componentBulkLoader);
                 } else {
                     //no buddy-btree needed
-                    componentBulkLoader = mergedComponent.createBulkLoader(1.0f, false, 0L, false, false, false);
+                    componentBulkLoader = mergedComponent.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, 0L,
+                            false, false, false);
                 }
                 //search old rtree components
                 while (cursor.hasNext()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index a3ba4b1..f4e919a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -109,7 +110,8 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
                 try {
                     memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
                     component = createDiskComponent(componentFactory, flushOp.getTarget(), null, null, true);
-                    componentBulkLoader = component.createBulkLoader(1.0f, false, 0L, false, false, false);
+                    componentBulkLoader =
+                            component.createBulkLoader(LSMIOOperationType.FLUSH, 1.0f, false, 0L, false, false, false);
                     // Since the LSM-RTree is used as a secondary assumption, the
                     // primary key will be the last comparator in the BTree comparators
                     rTreeTupleSorter = new TreeTupleSorter(flushingComponent.getIndex().getFileId(), linearizerArray,
@@ -235,7 +237,7 @@ public class LSMRTreeWithAntiMatterTuples extends AbstractLSMRTree {
         ILSMDiskComponent component = createDiskComponent(componentFactory, mergeOp.getTarget(), null, null, true);
 
         ILSMDiskComponentBulkLoader componentBulkLoader =
-                component.createBulkLoader(1.0f, false, 0L, false, false, false);
+                component.createBulkLoader(LSMIOOperationType.MERGE, 1.0f, false, 0L, false, false, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3036c980/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
index a420ba9..da87b27 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/common/AbstractInvertedIndexTest.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.datagen.TupleGenerator;
 import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
+import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import org.apache.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifier;
 import org.apache.hyracks.storage.am.lsm.invertedindex.search.JaccardSearchModifier;
 import org.apache.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
@@ -84,6 +85,12 @@ public abstract class AbstractInvertedIndexTest {
             LSMInvertedIndexTestUtils.compareActualAndExpectedIndexes(testCtx);
         }
         LSMInvertedIndexTestUtils.compareActualAndExpectedIndexesRangeSearch(testCtx);
+        if (invIndexType == InvertedIndexType.LSM || invIndexType == InvertedIndexType.PARTITIONED_LSM) {
+            LSMInvertedIndex lsmIndex = (LSMInvertedIndex) invIndex;
+            if (!lsmIndex.isMemoryComponentsAllocated() || lsmIndex.isCurrentMutableComponentEmpty()) {
+                LSMInvertedIndexTestUtils.compareActualAndExpectedIndexesMergeSearch(testCtx);
+            }
+        }
     }
 
     /**