You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2019/04/22 16:10:43 UTC

[asterixdb] branch master updated: [ASTERIXDB-2310][STO]Enforce Key Uniquness using PKIndex

This is an automated email from the ASF dual-hosted git repository.

luochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 89cec58  [ASTERIXDB-2310][STO]Enforce Key Uniquness using PKIndex
89cec58 is described below

commit 89cec5899945cf41e3269b24c2ac138dc52b97f7
Author: luochen <cl...@uci.edu>
AuthorDate: Sun Apr 21 20:34:08 2019 -0700

    [ASTERIXDB-2310][STO]Enforce Key Uniquness using PKIndex
    
    - user model changes: no
    - storage format changes: yes. Primary key index
    now has bloom filters.
    - interface changes: no
    
    Details:
    - Add bloom filters to primary key index.
    - Introduce LSMPrimaryInsertOperator to separate uniqueness check from
    the primary index. When the primary key index is available, it will be
    used for uniqueness check. This implementation of this operation is
    similar to LSMPrimaryUpsertOperator.
    
    Change-Id: I7a52bb75ee5b14521972999df2f45ba62adc5af1
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/2453
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../IntroduceSecondaryIndexInsertDeleteRule.java   |   5 +
 .../asterix/app/bootstrap/TestNodeController.java  | 141 +++++++++--
 .../dataflow/CheckpointInSecondaryIndexTest.java   |   6 +-
 .../test/dataflow/ComponentRollbackTest.java       |   4 +-
 .../test/dataflow/ConcurrentInsertTest.java        | 230 +++++++++++++++++
 .../test/dataflow/IoCallbackFailureTest.java       |   6 +-
 .../test/dataflow/LSMFlushRecoveryTest.java        |   6 +-
 .../asterix/test/dataflow/LogMarkerTest.java       |   6 +-
 .../test/dataflow/MultiPartitionLSMIndexTest.java  |   8 +-
 .../dataflow/SearchCursorComponentSwitchTest.java  |   6 +-
 .../asterix/test/dataflow/StorageTestUtils.java    |  30 ++-
 .../test/dataflow/TransactionAbortTest.java        |  10 +-
 .../asterix/test/logging/CheckpointingTest.java    |  10 +-
 .../delete-primary-key-index-with-secondary.sqlpp} |  34 +--
 ...nd-scan-primary-key-index-with-secondary.sqlpp} |  36 +--
 ...nsert-primary-key-index-with-auto-gen-pk.sqlpp} |  32 +--
 .../insert-primary-key-index.sqlpp}                |  33 +--
 .../load-primary-key-index-with-secondary.sqlpp}   |  41 ++--
 .../load-primary-key-index.sqlpp}                  |  39 +--
 .../upsert-primary-key-index-with-secondary.sqlpp} |  34 +--
 .../delete-primary-key-index-with-secondary.plan   |  20 ++
 ...-and-scan-primary-key-index-with-secondary.plan |  19 ++
 .../insert-primary-key-index-with-auto-gen-pk.plan |  12 +
 .../insert-primary-key-index.plan                  |   8 +
 .../load-primary-key-index-with-secondary.plan     |  67 +++++
 .../primary-key-index/load-primary-key-index.plan  |  17 ++
 .../upsert-primary-key-index-with-secondary.plan   |  16 ++
 .../insert-duplicated-keys.1.ddl.sqlpp             |   0
 .../insert-duplicated-keys.2.update.sqlpp          |  30 +++
 .../insert-duplicated-keys.3.ddl.sqlpp}            |  23 +-
 .../insert-duplicated-keys.1.ddl.sqlpp             |   2 -
 .../insert-feed-with-pk-index.1.ddl.sqlpp}         |  42 ++--
 .../insert-feed-with-pk-index.2.update.sqlpp}      |  32 +--
 .../insert-feed-with-pk-index.3.server.sqlpp}      |  30 +--
 .../insert-feed-with-pk-index.4.sleep.sqlpp}       |  30 +--
 .../insert-feed-with-pk-index.5.update.sqlpp}      |  31 +--
 .../insert-feed-with-pk-index.6.query.sqlpp}       |  31 +--
 .../insert-feed-with-pk-index.7.server.sqlpp}      |  29 +--
 .../insert-feed-with-pk-index.8.ddl.sqlpp}         |  31 +--
 .../test/resources/runtimets/testsuite_sqlpp.xml   |  12 +
 .../declared/BTreeResourceFactoryProvider.java     |   3 +-
 .../metadata/declared/MetadataProvider.java        |  27 +-
 .../apache/asterix/metadata/entities/Dataset.java  |  16 +-
 .../apache/asterix/metadata/entities/Index.java    |   5 +
 .../LSMPrimaryInsertOperatorDescriptor.java        |  63 +++++
 .../LSMPrimaryInsertOperatorNodePushable.java      | 271 +++++++++++++++++++++
 .../LockThenSearchOperationCallback.java           |  45 +---
 .../PrimaryIndexModificationOperationCallback.java |   6 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java       |   9 +-
 .../storage/am/lsm/btree/utils/LSMBTreeUtil.java   |  10 +-
 .../storage/am/lsm/common/impls/LSMHarness.java    |   6 +-
 .../storage/am/lsm/btree/impl/TestLsmBtree.java    |   9 +-
 .../am/lsm/btree/impl/TestLsmBtreeUtil.java        |  12 +-
 53 files changed, 1255 insertions(+), 426 deletions(-)

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index e123715..6c258e4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -170,6 +171,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
             metaType = (ARecordType) mp.findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
         }
         List<Index> indexes = mp.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
+        if (primaryIndexModificationOp.getOperation() == Kind.INSERT && !primaryIndexModificationOp.isBulkload()) {
+            // for insert, primary key index is handled together when primary index
+            indexes = indexes.stream().filter(index -> !index.isPrimaryKeyIndex()).collect(Collectors.toList());
+        }
         // Set the top operator pointer to the primary IndexInsertDeleteOperator
         ILogicalOperator currentTop = primaryIndexModificationOp;
         boolean hasSecondaryIndex = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 1a9df0b..ffbddb6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -60,6 +60,7 @@ import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
@@ -187,15 +188,6 @@ public class TestNodeController {
         return new TxnId(jobId.getId());
     }
 
-    public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getInsertPipeline(IHyracksTaskContext ctx,
-            Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
-            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider, Index secondaryIndex)
-            throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
-        return getInsertPipeline(ctx, dataset, primaryKeyTypes, recordType, metaType, filterFields, primaryKeyIndexes,
-                primaryKeyIndicators, storageComponentProvider, secondaryIndex, IndexOperation.INSERT);
-    }
-
     public Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> getBulkLoadSecondaryOperator(
             IHyracksTaskContext ctx, Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
             ARecordType metaType, int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
@@ -228,10 +220,10 @@ public class TestNodeController {
         }
     }
 
-    public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getInsertPipeline(IHyracksTaskContext ctx,
+    public Pair<LSMPrimaryInsertOperatorNodePushable, IPushRuntime> getInsertPipeline(IHyracksTaskContext ctx,
             Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider, Index secondaryIndex, IndexOperation op)
+            StorageComponentProvider storageComponentProvider, Index secondaryIndex, Index primaryKeyIndex)
             throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
         CcApplicationContext appCtx =
                 (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
@@ -243,18 +235,28 @@ public class TestNodeController {
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                     mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
-            IModificationOperationCallbackFactory modOpCallbackFactory = dataset.getModificationCallbackFactory(
-                    storageComponentProvider, primaryIndexInfo.index, op, primaryIndexInfo.primaryKeyIndexes);
+            IModificationOperationCallbackFactory modOpCallbackFactory =
+                    dataset.getModificationCallbackFactory(storageComponentProvider, primaryIndexInfo.index,
+                            IndexOperation.INSERT, primaryIndexInfo.primaryKeyIndexes);
+            ISearchOperationCallbackFactory searchOpCallbackFactory =
+                    dataset.getSearchCallbackFactory(storageComponentProvider, primaryIndexInfo.index,
+                            IndexOperation.INSERT, primaryIndexInfo.primaryKeyIndexes);
             IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
             RecordDescriptor recordDesc =
                     recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0);
             IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-            LSMInsertDeleteOperatorNodePushable insertOp =
-                    new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
-                            primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, op, true,
-                            indexHelperFactory, modOpCallbackFactory, null, null);
+            IIndexDataflowHelperFactory pkIndexHelperFactory = null;
+            if (primaryKeyIndex != null) {
+                SecondaryIndexInfo pkIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, primaryKeyIndex);
+                pkIndexHelperFactory = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                        pkIndexInfo.fileSplitProvider);
+            }
 
+            LSMPrimaryInsertOperatorNodePushable insertOp = new LSMPrimaryInsertOperatorNodePushable(ctx,
+                    ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, pkIndexHelperFactory,
+                    primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, modOpCallbackFactory,
+                    searchOpCallbackFactory, primaryKeyIndexes.length, filterFields, null);
             // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta>
             // for the index, we will have to create an assign operator that extract the sk
             // then the secondary LSMInsertDeleteOperatorNodePushable
@@ -299,10 +301,10 @@ public class TestNodeController {
                         dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex,
                                 IndexOperation.INSERT, primaryKeyIndexes);
 
-                LSMInsertDeleteOperatorNodePushable secondaryInsertOp =
-                        new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
-                                secondaryIndexInfo.insertFieldsPermutations, secondaryIndexInfo.rDesc, op, false,
-                                secondaryIndexHelperFactory, secondaryModCallbackFactory, null, null);
+                LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx,
+                        ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations,
+                        secondaryIndexInfo.rDesc, IndexOperation.INSERT, false, secondaryIndexHelperFactory,
+                        secondaryModCallbackFactory, null, null);
                 assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc);
 
                 IPushRuntime commitOp =
@@ -325,6 +327,103 @@ public class TestNodeController {
         }
     }
 
+    public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getDeletePipeline(IHyracksTaskContext ctx,
+            Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
+            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
+            StorageComponentProvider storageComponentProvider, Index secondaryIndex)
+            throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+        CcApplicationContext appCtx =
+                (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
+        MetadataProvider mdProvider = new MetadataProvider(appCtx, null);
+        try {
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                    mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
+            IModificationOperationCallbackFactory modOpCallbackFactory =
+                    dataset.getModificationCallbackFactory(storageComponentProvider, primaryIndexInfo.index,
+                            IndexOperation.DELETE, primaryIndexInfo.primaryKeyIndexes);
+            IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
+            RecordDescriptor recordDesc =
+                    recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0);
+            IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
+                    storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+            LSMInsertDeleteOperatorNodePushable deleteOp =
+                    new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
+                            primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, IndexOperation.DELETE,
+                            true, indexHelperFactory, modOpCallbackFactory, null, null);
+            // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta>
+            // for the index, we will have to create an assign operator that extract the sk
+            // then the secondary LSMInsertDeleteOperatorNodePushable
+            if (secondaryIndex != null) {
+                List<List<String>> skNames = secondaryIndex.getKeyFieldNames();
+                List<Integer> indicators = secondaryIndex.getKeyFieldSourceIndicators();
+                IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories =
+                        new IScalarEvaluatorFactory[skNames.size()];
+                for (int i = 0; i < skNames.size(); i++) {
+                    ARecordType sourceType = dataset.hasMetaPart()
+                            ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordType : metaType
+                            : recordType;
+                    int pos = skNames.get(i).size() > 1 ? -1 : sourceType.getFieldIndex(skNames.get(i).get(0));
+                    secondaryFieldAccessEvalFactories[i] =
+                            mdProvider.getDataFormat().getFieldAccessEvaluatorFactory(mdProvider.getFunctionManager(),
+                                    sourceType, secondaryIndex.getKeyFieldNames().get(i), pos, null);
+                }
+                // outColumns are computed inside the assign runtime
+                int[] outColumns = new int[skNames.size()];
+                // projection list include old and new (primary and secondary keys)
+                int[] projectionList = new int[skNames.size() + primaryIndexInfo.index.getKeyFieldNames().size()];
+                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) {
+                    outColumns[i] = primaryIndexInfo.rDesc.getFieldCount() + i;
+                }
+                int projCount = 0;
+                for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) {
+                    projectionList[projCount++] = primaryIndexInfo.rDesc.getFieldCount() + i;
+                }
+                for (int i = 0; i < primaryIndexInfo.index.getKeyFieldNames().size(); i++) {
+                    projectionList[projCount++] = i;
+                }
+                IPushRuntime assignOp =
+                        new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, projectionList, true)
+                                .createPushRuntime(ctx)[0];
+                deleteOp.setOutputFrameWriter(0, assignOp, primaryIndexInfo.rDesc);
+                assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+                SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
+                IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory(
+                        storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider);
+
+                IModificationOperationCallbackFactory secondaryModCallbackFactory =
+                        dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex,
+                                IndexOperation.INSERT, primaryKeyIndexes);
+
+                LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx,
+                        ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations,
+                        secondaryIndexInfo.rDesc, IndexOperation.DELETE, false, secondaryIndexHelperFactory,
+                        secondaryModCallbackFactory, null, null);
+                assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc);
+
+                IPushRuntime commitOp =
+                        dataset.getCommitRuntimeFactory(mdProvider, secondaryIndexInfo.primaryKeyIndexes, true)
+                                .createPushRuntime(ctx)[0];
+
+                secondaryInsertOp.setOutputFrameWriter(0, commitOp, secondaryIndexInfo.rDesc);
+                commitOp.setInputRecordDescriptor(0, secondaryIndexInfo.rDesc);
+                return Pair.of(deleteOp, commitOp);
+            } else {
+                IPushRuntime commitOp =
+                        dataset.getCommitRuntimeFactory(mdProvider, primaryIndexInfo.primaryKeyIndexes, true)
+                                .createPushRuntime(ctx)[0];
+                deleteOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
+                commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+                return Pair.of(deleteOp, commitOp);
+            }
+        } finally {
+            mdProvider.getLocks().unlock();
+        }
+    }
+
     public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset,
             IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
             NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
index 3da845d..54f44b1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
@@ -35,7 +35,6 @@ import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
@@ -54,6 +53,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.base.TestMethodTracer;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.commons.lang3.tuple.Pair;
@@ -132,7 +132,7 @@ public class CheckpointInSecondaryIndexTest {
     private static IHyracksTaskContext taskCtx;
     private static IIndexDataflowHelper primaryIndexDataflowHelper;
     private static IIndexDataflowHelper secondaryIndexDataflowHelper;
-    private static LSMInsertDeleteOperatorNodePushable insertOp;
+    private static LSMPrimaryInsertOperatorNodePushable insertOp;
     private static LSMIndexBulkLoadOperatorNodePushable indexLoadOp;
     private static IHyracksTaskContext loadTaskCtx;
     private static SecondaryIndexInfo secondaryIndexInfo;
@@ -190,7 +190,7 @@ public class CheckpointInSecondaryIndexTest {
         primaryIndexDataflowHelper.close();
         // This pipeline skips the secondary index
         insertOp = nc.getInsertPipeline(taskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager, null).getLeft();
+                KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
         actor = new Actor("player");
         // allow all operations
         StorageTestUtils.allowAllOps(primaryLsmBtree);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index eddab22..f72a441 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -30,12 +30,12 @@ import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
 import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.dataflow.StorageTestUtils.Flusher;
 import org.apache.asterix.test.dataflow.StorageTestUtils.Merger;
@@ -73,7 +73,7 @@ public class ComponentRollbackTest {
     private static IHyracksTaskContext ctx;
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
-    private static LSMInsertDeleteOperatorNodePushable insertOp;
+    private static LSMPrimaryInsertOperatorNodePushable insertOp;
     private static final int PARTITION = 0;
     private static String indexPath;
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConcurrentInsertTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConcurrentInsertTest.java
new file mode 100644
index 0000000..76dd035
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConcurrentInsertTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.asterix.test.base.TestMethodTracer;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+/**
+ * Two inserters run concurrently to insert data with duplicates (inter/intra-inserter).
+ * Each inserter simulates a feed that removes duplicate tuples upon exception.
+ *
+ */
+public class ConcurrentInsertTest {
+    private static TestNodeController nc;
+    private static TestLsmBtree lsmBtree;
+    private static TestLsmBtree secondaryIndex;
+    private static TestLsmBtree primaryKeyIndex;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+    private static IIndexDataflowHelper indexDataflowHelper;
+    private static IIndexDataflowHelper secondaryDataflowHelper;
+    private static IIndexDataflowHelper primaryKeyIndexDataflowHelper;
+    private static final int PARTITION = 0;
+    private static LSMPrimaryInsertOperatorNodePushable insertOp1;
+    private static LSMPrimaryInsertOperatorNodePushable insertOp2;
+
+    private static int NUM_INSERT_RECORDS = 1000;
+
+    private static IHyracksTaskContext ctx1;
+    private static ITransactionContext txnCtx1;
+
+    private static IHyracksTaskContext ctx2;
+    private static ITransactionContext txnCtx2;
+
+    @Rule
+    public TestRule watcher = new TestMethodTracer();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = Paths.get(System.getProperty("user.dir"), "src", "test", "resources", "cc.conf").toString();
+        nc = new TestNodeController(configPath, false);
+        nc.init();
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        nc.deInit();
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Before
+    public void createIndex() throws Exception {
+        JobId jobId = nc.newJobId();
+        ctx1 = nc.createTestContext(jobId, PARTITION, false);
+        txnCtx1 = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx1),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+
+        JobId newJobId = nc.newJobId();
+        ctx2 = nc.createTestContext(newJobId, PARTITION, false);
+        txnCtx2 = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
+                new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+
+        PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+        IndexDataflowHelperFactory iHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        indexDataflowHelper = iHelperFactory.create(ctx1.getJobletContext().getServiceContext(), PARTITION);
+        indexDataflowHelper.open();
+        lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+        indexDataflowHelper.close();
+
+        Index secondaryIndexEntity = new Index(StorageTestUtils.DATASET.getDataverseName(),
+                StorageTestUtils.DATASET.getDatasetName(), "TestIndex", IndexType.BTREE,
+                Arrays.asList(Arrays.asList(StorageTestUtils.RECORD_TYPE.getFieldNames()[1])),
+                Arrays.asList(Index.RECORD_INDICATOR), Arrays.asList(BuiltinType.AINT64), false, false, false, 0);
+
+        SecondaryIndexInfo secondaryIndexInfo =
+                nc.createSecondaryIndex(primaryIndexInfo, secondaryIndexEntity, StorageTestUtils.STORAGE_MANAGER, 0);
+        IndexDataflowHelperFactory secondaryIHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider());
+        secondaryDataflowHelper =
+                secondaryIHelperFactory.create(ctx1.getJobletContext().getServiceContext(), PARTITION);
+        secondaryDataflowHelper.open();
+        secondaryIndex = (TestLsmBtree) secondaryDataflowHelper.getIndexInstance();
+        secondaryDataflowHelper.close();
+
+        Index primaryKeyIndexEntity = new Index(StorageTestUtils.DATASET.getDataverseName(),
+                StorageTestUtils.DATASET.getDatasetName(), "PrimaryKeyIndex", IndexType.BTREE, Arrays.asList(),
+                Arrays.asList(), Arrays.asList(), false, false, false, 0);
+
+        SecondaryIndexInfo primaryKeyIndexInfo =
+                nc.createSecondaryIndex(primaryIndexInfo, primaryKeyIndexEntity, StorageTestUtils.STORAGE_MANAGER, 0);
+        IndexDataflowHelperFactory primaryKeyIHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), primaryKeyIndexInfo.getFileSplitProvider());
+        primaryKeyIndexDataflowHelper =
+                primaryKeyIHelperFactory.create(ctx1.getJobletContext().getServiceContext(), PARTITION);
+        primaryKeyIndexDataflowHelper.open();
+        primaryKeyIndex = (TestLsmBtree) secondaryDataflowHelper.getIndexInstance();
+        primaryKeyIndexDataflowHelper.close();
+
+        insertOp1 = StorageTestUtils.getInsertPipeline(nc, ctx1, secondaryIndexEntity, primaryKeyIndexEntity);
+        insertOp2 = StorageTestUtils.getInsertPipeline(nc, ctx2, secondaryIndexEntity, primaryKeyIndexEntity);
+    }
+
+    @After
+    public void destroyIndex() throws Exception {
+        indexDataflowHelper.destroy();
+    }
+
+    @Test
+    public void test() throws InterruptedException, HyracksDataException, AlgebricksException {
+        Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    insertRecords(NUM_INSERT_RECORDS, ctx1, insertOp1);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    insertRecords(NUM_INSERT_RECORDS, ctx2, insertOp2);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        nc.getTransactionManager().commitTransaction(txnCtx1.getTxnId());
+        nc.getTransactionManager().commitTransaction(txnCtx2.getTxnId());
+
+        StorageTestUtils.searchAndAssertCount(nc, 0, NUM_INSERT_RECORDS / 2);
+    }
+
+    private ITupleReference insertRecords(int numRecords, IHyracksTaskContext ctx,
+            LSMPrimaryInsertOperatorNodePushable insertOp) throws Exception {
+        StorageTestUtils.allowAllOps(lsmBtree);
+        StorageTestUtils.allowAllOps(primaryKeyIndex);
+        StorageTestUtils.allowAllOps(secondaryIndex);
+        insertOp.open();
+        VSizeFrame frame = new VSizeFrame(ctx);
+        FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+        ITupleReference tuple = null;
+        RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+        RecordTupleGenerator backupGenerator = StorageTestUtils.getTupleGenerator();
+        SyncFeedRuntimeInputHandler inputHandler =
+                new SyncFeedRuntimeInputHandler(ctx, insertOp, new FrameTupleAccessor(new RecordDescriptor(
+                        new ISerializerDeserializer[StorageTestUtils.RECORD_TYPE.getFieldTypes().length])));
+        for (int i = 0; i < numRecords; i++) {
+            if (i % 2 == 0) {
+                tuple = tupleGenerator.next();
+            } else {
+                tuple = backupGenerator.next();
+            }
+            DataflowUtils.addTupleToFrame(tupleAppender, tuple, inputHandler);
+        }
+        if (tupleAppender.getTupleCount() > 0) {
+            tupleAppender.write(inputHandler, true);
+        }
+        insertOp.close();
+        return tuple;
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
index b618727..c167b0f1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
@@ -25,11 +25,11 @@ import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
 import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -81,7 +81,7 @@ public class IoCallbackFailureTest {
         indexDataflowHelper.open();
         TestLsmBtree lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
-        LSMInsertDeleteOperatorNodePushable insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+        LSMPrimaryInsertOperatorNodePushable insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
         StorageTestUtils.allowAllOps(lsmBtree);
         ITestOpCallback<ILSMMemoryComponent> failCallback = new ITestOpCallback<ILSMMemoryComponent>() {
             @SuppressWarnings("deprecation")
@@ -114,7 +114,7 @@ public class IoCallbackFailureTest {
     }
 
     private static void insert(TestNodeController nc, TestLsmBtree lsmBtree, IHyracksTaskContext ctx,
-            LSMInsertDeleteOperatorNodePushable insertOp, int totalNumRecords, int recordsPerComponent)
+            LSMPrimaryInsertOperatorNodePushable insertOp, int totalNumRecords, int recordsPerComponent)
             throws Exception {
         NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext();
         IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index 5ff918e..03ca1f0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -35,7 +35,6 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.StorageProperties.Option;
 import org.apache.asterix.common.context.DatasetInfo;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -45,6 +44,7 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -91,7 +91,7 @@ public class LSMFlushRecoveryTest {
     private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
     private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
     private static ITransactionContext txnCtx;
-    private static LSMInsertDeleteOperatorNodePushable[] insertOps;
+    private static LSMPrimaryInsertOperatorNodePushable[] insertOps;
     private static RecordTupleGenerator tupleGenerator;
 
     private static final int NUM_PARTITIONS = 2;
@@ -231,7 +231,7 @@ public class LSMFlushRecoveryTest {
     }
 
     private void createInsertOps() throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
-        insertOps = new LSMInsertDeleteOperatorNodePushable[NUM_PARTITIONS];
+        insertOps = new LSMPrimaryInsertOperatorNodePushable[NUM_PARTITIONS];
         for (int i = 0; i < NUM_PARTITIONS; i++) {
             insertOps[i] = StorageTestUtils.getInsertPipeline(nc, testCtxs[i], secondaryIndexEntity);
         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 83eba24..0de0539 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -29,7 +29,6 @@ import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
 import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
 import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -40,6 +39,7 @@ import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
@@ -116,9 +116,9 @@ public class LogMarkerTest {
                 IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
                 ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
-                LSMInsertDeleteOperatorNodePushable insertOp =
+                LSMPrimaryInsertOperatorNodePushable insertOp =
                         nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                                KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
+                                KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
                 insertOp.open();
                 RecordTupleGenerator tupleGenerator =
                         new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index a7225a1..a1b251f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -35,7 +35,6 @@ import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
@@ -48,6 +47,7 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningS
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -121,7 +121,7 @@ public class MultiPartitionLSMIndexTest {
     private static IHyracksTaskContext[] taskCtxs;
     private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
     private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
-    private static LSMInsertDeleteOperatorNodePushable[] insertOps;
+    private static LSMPrimaryInsertOperatorNodePushable[] insertOps;
     private static Actor[] actors;
 
     @BeforeClass
@@ -158,7 +158,7 @@ public class MultiPartitionLSMIndexTest {
         secondaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
         primaryLsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
         secondaryLsmBtrees = new TestLsmBtree[NUM_PARTITIONS];
-        insertOps = new LSMInsertDeleteOperatorNodePushable[NUM_PARTITIONS];
+        insertOps = new LSMPrimaryInsertOperatorNodePushable[NUM_PARTITIONS];
         JobId jobId = nc.newJobId();
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
@@ -184,7 +184,7 @@ public class MultiPartitionLSMIndexTest {
             secondaryIndexDataflowHelpers[i].close();
             primaryIndexDataflowHelpers[i].close();
             insertOps[i] = nc.getInsertPipeline(taskCtxs[i], dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                    KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex).getLeft();
+                    KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, null).getLeft();
             actors[i] = new Actor("player-" + i, i);
         }
         // allow all operations
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index d4672b7..37b40bb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -31,7 +31,6 @@ import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
@@ -44,6 +43,7 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningS
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -100,7 +100,7 @@ public class SearchCursorComponentSwitchTest {
     private static IHyracksTaskContext ctx;
     private static IIndexDataflowHelper indexDataflowHelper;
     private static ITransactionContext txnCtx;
-    private static LSMInsertDeleteOperatorNodePushable insertOp;
+    private static LSMPrimaryInsertOperatorNodePushable insertOp;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -140,7 +140,7 @@ public class SearchCursorComponentSwitchTest {
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, storageManager, null).getLeft();
+                KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
     }
 
     @After
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index 589e8b2..99aee07 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -50,6 +50,7 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningS
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -59,7 +60,6 @@ import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.test.CountAnswer;
 import org.apache.hyracks.api.test.FrameWriterTestUtils;
 import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
@@ -124,35 +124,41 @@ public class StorageTestUtils {
                 KEY_INDICATORS_LIST, partition);
     }
 
-    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
+    public static LSMPrimaryInsertOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
             throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
         return getInsertPipeline(nc, ctx, null);
     }
 
-    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
-            Index secondaryIndex, IndexOperation op)
+    public static LSMPrimaryInsertOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+            Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+        return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, null).getLeft();
+    }
+
+    public static LSMPrimaryInsertOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+            Index secondaryIndex, Index primaryKeyIndex)
             throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
         return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, op).getLeft();
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, primaryKeyIndex).getLeft();
     }
 
-    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
-            Dataset dataset, Index secondaryIndex, IndexOperation op)
+    public static LSMPrimaryInsertOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+            Dataset dataset, Index secondaryIndex)
             throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
         return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, op).getLeft();
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, null).getLeft();
     }
 
-    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+    public static LSMInsertDeleteOperatorNodePushable getDeletePipeline(TestNodeController nc, IHyracksTaskContext ctx,
             Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
-        return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+        return nc.getDeletePipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
     }
 
-    public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+    public static LSMInsertDeleteOperatorNodePushable getDeletePipeline(TestNodeController nc, IHyracksTaskContext ctx,
             Dataset dataset, Index secondaryIndex)
             throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
-        return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+        return nc.getDeletePipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
index bee2f8d..bd84d54 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
@@ -31,6 +31,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.base.TestMethodTracer;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -41,7 +42,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.junit.After;
@@ -61,7 +61,7 @@ public class TransactionAbortTest {
     private static IHyracksTaskContext ctx;
     private static IIndexDataflowHelper indexDataflowHelper;
     private static final int PARTITION = 0;
-    private static LSMInsertDeleteOperatorNodePushable insertOp;
+    private static LSMPrimaryInsertOperatorNodePushable insertOp;
     private static int NUM_INSERT_RECORDS = 1000;
     private static ITransactionContext txnCtx;
 
@@ -121,7 +121,7 @@ public class TransactionAbortTest {
         Assert.assertEquals(1, lsmBtree.getDiskComponents().size());
         StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
 
-        abortOp = StorageTestUtils.getInsertPipeline(nc, abortCtx, null, IndexOperation.DELETE);
+        abortOp = StorageTestUtils.getDeletePipeline(nc, abortCtx, null);
         testAbort(lastTuple);
         StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
     }
@@ -132,7 +132,7 @@ public class TransactionAbortTest {
         Assert.assertEquals(0, lsmBtree.getDiskComponents().size());
         StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
 
-        abortOp = StorageTestUtils.getInsertPipeline(nc, abortCtx, null, IndexOperation.DELETE);
+        abortOp = StorageTestUtils.getDeletePipeline(nc, abortCtx, null);
         testAbort(lastTuple);
         StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
     }
@@ -143,7 +143,7 @@ public class TransactionAbortTest {
         Assert.assertEquals(0, lsmBtree.getDiskComponents().size());
         StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
 
-        abortOp = StorageTestUtils.getInsertPipeline(nc, abortCtx, null, IndexOperation.INSERT);
+        abortOp = StorageTestUtils.getDeletePipeline(nc, abortCtx, null);
         testAbort(tupleGenerator.next());
         StorageTestUtils.searchAndAssertCount(nc, PARTITION, NUM_INSERT_RECORDS);
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index d5d34b2..2a3e9b8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -33,7 +33,6 @@ import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.data.gen.RecordTupleGenerator;
 import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
 import org.apache.asterix.app.nc.RecoveryManager;
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -50,6 +49,7 @@ import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.dataflow.StorageTestUtils;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -119,9 +119,9 @@ public class CheckpointingTest {
                 ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp =
+                LSMPrimaryInsertOperatorNodePushable insertOp =
                         nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                                KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+                                KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null, null).getLeft();
                 insertOp.open();
                 RecordTupleGenerator tupleGenerator =
                         new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
@@ -190,9 +190,9 @@ public class CheckpointingTest {
                 nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
                         new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp2 =
+                LSMPrimaryInsertOperatorNodePushable insertOp2 =
                         nc.getInsertPipeline(ctx2, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
-                                KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+                                KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null, null).getLeft();
                 insertOp2.open();
                 VSizeFrame frame2 = new VSizeFrame(ctx2);
                 FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/delete-primary-key-index-with-secondary.sqlpp
similarity index 63%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/delete-primary-key-index-with-secondary.sqlpp
index eef86a3..04c05ff 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/delete-primary-key-index-with-secondary.sqlpp
@@ -16,25 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description     : Testing delete primary key index
+ * Expected Result : Success
+ * Date            : March 3rh 2018
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
+drop  dataverse test if exists;
+create  dataverse test;
 
+use test;
 
-create type STBench.SimpleGeoPlaceType as
+create type test.DBLPType as
  closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+  id : int64,
+  dblpid : string,
+  title : string,
+  authors : string,
+  misc : string
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create  dataset DBLP(DBLPType) primary key id;
+create primary index pkIndex on DBLP;
+create index titleIndex on DBLP(title) type btree;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+delete from DBLP
+where id>10;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-and-scan-primary-key-index-with-secondary.sqlpp
similarity index 59%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-and-scan-primary-key-index-with-secondary.sqlpp
index eef86a3..e526de0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-and-scan-primary-key-index-with-secondary.sqlpp
@@ -16,25 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Test case Name  : insert-primary-key-index.sqlpp
+ * Description     : This test is intended to test inserting into a dataset with primary key index.
+ * Expected Result : Success
+ * Date            : March 3 2018
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
+drop  dataverse test if exists;
+create  dataverse test;
 
+use test;
 
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+create type test.myDataType as
+{
+  id : integer,
+  name: string
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create dataset myData(myDataType) primary key id;
+create primary index pkIndex on myData;
+create index nameIndex on myData(name) type btree;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+insert into myData
+select element {'id':(x.id + 1)}
+from  myData as x
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-primary-key-index-with-auto-gen-pk.sqlpp
similarity index 55%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-primary-key-index-with-auto-gen-pk.sqlpp
index eef86a3..3c1fc0c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-primary-key-index-with-auto-gen-pk.sqlpp
@@ -16,25 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description     : Testing insert primary key index
+ * Expected Result : Success
+ * Date            : March 3rh 2018
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
+drop  dataverse test if exists;
+create  dataverse test;
 
+use test;
 
-create type STBench.SimpleGeoPlaceType as
+create type test.DBLPType as
  closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+  id : uuid,
+  dblpid : string,
+  title : string,
+  authors : string,
+  misc : string
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
+create  dataset DBLP(DBLPType) primary key id autogenerated;
 
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create primary index pkIndex on DBLP;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+insert into DBLP
+select element {'dblpid':'books/acm/kim95/Blakeley95','title':'OQL[C++]  Extending C++ with an Object Query Capability.','authors':'José A. Blakeley','misc':'2002-01-03 69-88 Modern Database Systems db/books/collections/kim95.html#Blakeley95 1995'};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-primary-key-index.sqlpp
similarity index 55%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-primary-key-index.sqlpp
index eef86a3..197dedc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/insert-primary-key-index.sqlpp
@@ -16,25 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description     : Testing insert primary key index
+ * Expected Result : Success
+ * Date            : March 3rh 2018
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
+drop  dataverse test if exists;
+create  dataverse test;
 
+use test;
 
-create type STBench.SimpleGeoPlaceType as
+create type test.DBLPType as
  closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+  id : int64,
+  dblpid : string,
+  title : string,
+  authors : string,
+  misc : string
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create  dataset DBLP(DBLPType) primary key id;
+create primary index pkIndex on DBLP;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+insert into DBLP
+select element {'id':1, 'dblpid':'books/acm/kim95/Blakeley95','title':'OQL[C++]  Extending C++ with an Object Query Capability.','authors':'José A. Blakeley','misc':'2002-01-03 69-88 Modern Database Systems db/books/collections/kim95.html#Blakeley95 1995'};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/load-primary-key-index-with-secondary.sqlpp
similarity index 54%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/load-primary-key-index-with-secondary.sqlpp
index eef86a3..3fc468e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/load-primary-key-index-with-secondary.sqlpp
@@ -16,25 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description  : Load dataset with primary key index
+ * Expected Res : Success
+ * Date         : March 3rd 2018
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
+drop dataverse test if exists;
+create dataverse test;
+use test;
 
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle,
+  circle: circle
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
 
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create dataset MyData(MyRecord) primary key id;
+create primary index pkIndex on MyData;
+create index rtree_index_point on MyData(point) type rtree;
+create index kwds_index on MyData(kwds) type btree;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+load dataset MyData
+using localfs
+(("path"="asterix_nc1://data/spatial/spatialData.json"),("format"="adm"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/load-primary-key-index.sqlpp
similarity index 59%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/load-primary-key-index.sqlpp
index eef86a3..d710003 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/load-primary-key-index.sqlpp
@@ -16,25 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description  : Load dataset with primary key index
+ * Expected Res : Success
+ * Date         : March 3rd 2018
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
+drop dataverse test if exists;
+create dataverse test;
+use test;
 
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle,
+  circle: circle
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
 
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create dataset MyData(MyRecord) primary key id;
+create primary index pkIndex on MyData;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+load dataset MyData
+using localfs
+(("path"="asterix_nc1://data/spatial/spatialData.json"),("format"="adm"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/upsert-primary-key-index-with-secondary.sqlpp
similarity index 53%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/upsert-primary-key-index-with-secondary.sqlpp
index eef86a3..9b4cdb7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/primary-key-index/upsert-primary-key-index-with-secondary.sqlpp
@@ -16,25 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description     : Testing upsert primary key index
+ * Expected Result : Success
+ * Date            : March 3rh 2018
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
+drop  dataverse test if exists;
+create  dataverse test;
 
+use test;
 
-create type STBench.SimpleGeoPlaceType as
+create type test.DBLPType as
  closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+  id : int64,
+  dblpid : string,
+  title : string,
+  authors : string,
+  misc : string
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create  dataset DBLP(DBLPType) primary key id;
+create primary index pkIndex on DBLP;
+create index titleIndex on DBLP(title) type btree;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+upsert into DBLP
+select element {'id':1, 'dblpid':'books/acm/kim95/Blakeley95','title':'OQL[C++]  Extending C++ with an Object Query Capability.','authors':'José A. Blakeley','misc':'2002-01-03 69-88 Modern Database Systems db/books/collections/kim95.html#Blakeley95 1995'};
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/delete-primary-key-index-with-secondary.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/delete-primary-key-index-with-secondary.plan
new file mode 100644
index 0000000..acd2ad8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/delete-primary-key-index-with-secondary.plan
@@ -0,0 +1,20 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INDEX_INSERT_DELETE  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- INDEX_INSERT_DELETE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- INSERT_DELETE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- MATERIALIZE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- BTREE_SEARCH  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-and-scan-primary-key-index-with-secondary.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-and-scan-primary-key-index-with-secondary.plan
new file mode 100644
index 0000000..bdc8377
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-and-scan-primary-key-index-with-secondary.plan
@@ -0,0 +1,19 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INDEX_INSERT_DELETE  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- STREAM_PROJECT  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- INSERT_DELETE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- MATERIALIZE  |PARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$15]  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index-with-auto-gen-pk.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index-with-auto-gen-pk.plan
new file mode 100644
index 0000000..792b79d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index-with-auto-gen-pk.plan
@@ -0,0 +1,12 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$3]  |PARTITIONED|
+          -- ASSIGN  |UNPARTITIONED|
+            -- STREAM_PROJECT  |UNPARTITIONED|
+              -- ASSIGN  |UNPARTITIONED|
+                -- STREAM_PROJECT  |UNPARTITIONED|
+                  -- ASSIGN  |UNPARTITIONED|
+                    -- ASSIGN  |UNPARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index.plan
new file mode 100644
index 0000000..2c493d3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/insert-primary-key-index.plan
@@ -0,0 +1,8 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$3]  |PARTITIONED|
+          -- ASSIGN  |UNPARTITIONED|
+            -- ASSIGN  |UNPARTITIONED|
+              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/load-primary-key-index-with-secondary.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/load-primary-key-index-with-secondary.plan
new file mode 100644
index 0000000..abb5959
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/load-primary-key-index-with-secondary.plan
@@ -0,0 +1,67 @@
+-- SINK  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- INDEX_BULKLOAD  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STABLE_SORT [$$3(ASC), $$2(ASC)]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- REPLICATE  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BULKLOAD  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STABLE_SORT [$$2(ASC)]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$2]  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- INDEX_BULKLOAD  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- REPLICATE  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- BULKLOAD  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STABLE_SORT [$$2(ASC)]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$2]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- INDEX_BULKLOAD  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STABLE_SORT [$$5(ASC), $$6(ASC), $$2(ASC)]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- REPLICATE  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- BULKLOAD  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STABLE_SORT [$$2(ASC)]  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$2]  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/load-primary-key-index.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/load-primary-key-index.plan
new file mode 100644
index 0000000..0da5464
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/load-primary-key-index.plan
@@ -0,0 +1,17 @@
+-- SINK  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- INDEX_BULKLOAD  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- BULKLOAD  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$2(ASC)]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$2]  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- DATASOURCE_SCAN  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/upsert-primary-key-index-with-secondary.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/upsert-primary-key-index-with-secondary.plan
new file mode 100644
index 0000000..9ad7e30
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/primary-key-index/upsert-primary-key-index-with-secondary.plan
@@ -0,0 +1,16 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INDEX_INSERT_DELETE  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- INDEX_INSERT_DELETE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ASSIGN  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- INSERT_DELETE  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$3]  |PARTITIONED|
+                          -- ASSIGN  |UNPARTITIONED|
+                            -- ASSIGN  |UNPARTITIONED|
+                              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.1.ddl.sqlpp
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.1.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.2.update.sqlpp
new file mode 100644
index 0000000..a2bc508
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use STBench;
+
+
+insert into SimpleGeoPlace
+select element {'coordinates':STBench.point('-2.423658,53.0842802'),'id':5,'name':'20:20 Mobile','tags':'mobile','categories':'Professional Services Computer Services','phone':''};
+insert into SimpleGeoPlace
+select element {'coordinates':STBench.point('-2.423658,53.0842802'),'id':5,'name':'20:20 Mobile','tags':'mobile','categories':'Professional Services Computer Services','phone':''};
+insert into SimpleGeoPlace
+select element {'coordinates':STBench.point('-2.423658,53.0842802'),'id':5,'name':'20:20 Mobile','tags':'mobile','categories':'Professional Services Computer Services','phone':''};
+insert into SimpleGeoPlace
+select element {'coordinates':STBench.point('-2.423658,53.0842802'),'id':5,'name':'20:20 Mobile','tags':'mobile','categories':'Professional Services Computer Services','phone':''};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.3.ddl.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.3.ddl.sqlpp
index eef86a3..9f0b36a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys-with-pk-index/insert-duplicated-keys.3.ddl.sqlpp
@@ -16,25 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/* this should successfully drop STBench dataverse */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+drop  dataverse STBench;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
index eef86a3..91ff568 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
@@ -36,5 +36,3 @@ create type STBench.SimpleGeoPlaceType as
 create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
 
 create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.1.ddl.sqlpp
similarity index 50%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.1.ddl.sqlpp
index eef86a3..5b6d0fb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.1.ddl.sqlpp
@@ -16,25 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
+drop dataverse experiments if exists;
+create dataverse experiments;
+use experiments;
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
+create type TwitterUser if not exists as open{
+    `screen-name`: string,
+    friends_count: int32,
+    name: string,
+    followers_count: int32
 };
 
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
+create dataset TwitterUsers(TwitterUser) primary key `screen-name`;
 
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+create feed UserFeed with {
+ "adapter-name" : "socket_adapter",
+  "sockets" : "127.0.0.1:10001",
+  "address-type" : "IP",
+  "type-name" : "TwitterUser",
+  "format" : "adm",
+  "insert-feed" : "true"
+};
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+create primary index pk_index on TwitterUsers;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.2.update.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.2.update.sqlpp
index eef86a3..3bca1a2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.2.update.sqlpp
@@ -16,25 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
+use experiments;
+set `wait-for-completion-feed` "false";
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+connect feed UserFeed to dataset TwitterUsers;
+start feed UserFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.3.server.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.3.server.sqlpp
index eef86a3..3da77f7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.3.server.sqlpp
@@ -16,25 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
+start client 10001 file-client localhost ../asterix-app/data/tinysocial/twu_update.adm 500 50 1000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.4.sleep.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.4.sleep.sqlpp
index eef86a3..99ad0d7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.4.sleep.sqlpp
@@ -16,25 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
+10000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.5.update.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.5.update.sqlpp
index eef86a3..c19b2f7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.5.update.sqlpp
@@ -16,25 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+use experiments;
+stop feed UserFeed;
+disconnect feed UserFeed from dataset TwitterUsers;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.6.query.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.6.query.sqlpp
index eef86a3..77ee294 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.6.query.sqlpp
@@ -16,25 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
+use experiments;
 
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+select value x
+from TwitterUsers x
+order by x.`screen-name`;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.7.server.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.7.server.sqlpp
index eef86a3..d331474 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.7.server.sqlpp
@@ -16,25 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
 
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+stop 10001
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.8.ddl.sqlpp
similarity index 65%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.8.ddl.sqlpp
index eef86a3..2d29f2f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/dml/insert-duplicated-keys/insert-duplicated-keys.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/insert-feed-with-pk-index/insert-feed-with-pk-index.8.ddl.sqlpp
@@ -16,25 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-drop  dataverse STBench if exists;
-create  dataverse STBench;
-
-use STBench;
-
-
-create type STBench.SimpleGeoPlaceType as
- closed {
-  coordinates : point,
-  id : bigint,
-  name : string,
-  tags : string,
-  categories : string,
-  phone : string
-};
-
-create  dataset SimpleGeoPlace(SimpleGeoPlaceType) primary key id;
-
-create  index btreeName  on SimpleGeoPlace (name) type btree;
-
-create  primary index sec_primary_idx  on SimpleGeoPlace;
\ No newline at end of file
+/*
+ * Description  : Create a feed with upsert option. Push record twice
+ * with minor changes. The updated record will not cause duplicate
+ * key exception and the data will be updated.
+ * Expected Res : Success
+ * Date         : 13th Aug 2016
+ */
+use experiments;
+drop dataverse experiments;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 50793df..ef94997 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3664,6 +3664,13 @@
         <source-location>false</source-location>
       </compilation-unit>
     </test-case>
+     <test-case FilePath="dml">
+      <compilation-unit name="insert-duplicated-keys-with-pk-index">
+        <output-dir compare="Text">insert-duplicated-keys-with-pk-index</output-dir>
+        <expected-error>Inserting duplicate keys into the primary storage</expected-error>
+        <source-location>false</source-location>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="dml">
       <compilation-unit name="insert_less_nc">
         <output-dir compare="Text">insert_less_nc</output-dir>
@@ -10789,6 +10796,11 @@
         <output-dir compare="Text">insert-feed</output-dir>
       </compilation-unit>
     </test-case>
+     <test-case FilePath="feeds">
+      <compilation-unit name="insert-feed-with-pk-index">
+        <output-dir compare="Text">insert-feed</output-dir>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="feeds">
       <compilation-unit name="connect-feed-with-function">
         <output-dir compare="Text">connect-feed-with-function</output-dir>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 2a1d551..883da84 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -185,7 +185,8 @@ public class BTreeResourceFactoryProvider implements IResourceFactoryProvider {
     }
 
     private static int[] getBloomFilterFields(Dataset dataset, Index index) throws AlgebricksException {
-        if (index.isPrimaryIndex()) {
+        // both the Primary index and the Primary Key index have bloom filters
+        if (index.isPrimaryIndex() || index.isPrimaryKeyIndex()) {
             return dataset.getPrimaryBloomFilterFields();
         } else if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             if (index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName()))) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7071783..72c031f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -93,6 +94,7 @@ import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -1011,9 +1013,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             i++;
         }
         fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+        int[] filterFields = new int[numFilterFields];
         if (numFilterFields > 0) {
             int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
             fieldPermutation[i++] = idx;
+            filterFields[0] = idx;
         }
         if (additionalNonFilteringFields != null) {
             for (LogicalVariable variable : additionalNonFilteringFields) {
@@ -1043,8 +1047,27 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null, BulkLoadUsage.LOAD,
                     dataset.getDatasetId(), null);
         } else {
-            op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, null,
-                    true, modificationCallbackFactory);
+            if (indexOp == IndexOperation.INSERT) {
+                ISearchOperationCallbackFactory searchCallbackFactory = dataset
+                        .getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
+
+                Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE
+                        .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).stream()
+                        .filter(index -> index.isPrimaryKeyIndex()).findFirst();
+                IIndexDataflowHelperFactory pkidfh = null;
+                if (primaryKeyIndex.isPresent()) {
+                    Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primaryKeySplitsAndConstraint =
+                            getSplitProviderAndConstraints(dataset, primaryKeyIndex.get().getIndexName());
+                    pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                            primaryKeySplitsAndConstraint.first);
+                }
+                op = new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                        modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
+
+            } else {
+                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
+                        null, true, modificationCallbackFactory);
+            }
         }
         return new Pair<>(op, splitsAndConstraint.second);
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index d207f7f..f7ac890 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -543,7 +543,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
              * Due to the read-committed isolation level,
              * we may acquire very short duration lock(i.e., instant lock) for readers.
              */
-            return (op == IndexOperation.UPSERT)
+            return (op == IndexOperation.UPSERT || op == IndexOperation.INSERT)
                     ? new LockThenSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE)
                     : new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
@@ -602,13 +602,13 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             IStorageComponentProvider componentProvider, Index index, IndexOperation op, int[] primaryKeyFields)
             throws AlgebricksException {
         if (index.isPrimaryIndex()) {
-            return op == IndexOperation.UPSERT ? new UpsertOperationCallbackFactory(getDatasetId(), primaryKeyFields,
-                    componentProvider.getTransactionSubsystemProvider(), Operation.get(op), index.resourceType())
-                    : op == IndexOperation.DELETE || op == IndexOperation.INSERT
-                            ? new PrimaryIndexModificationOperationCallbackFactory(getDatasetId(), primaryKeyFields,
-                                    componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                                    index.resourceType())
-                            : NoOpOperationCallbackFactory.INSTANCE;
+            return op == IndexOperation.UPSERT || op == IndexOperation.INSERT
+                    ? new UpsertOperationCallbackFactory(getDatasetId(),
+                            primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType())
+                    : op == IndexOperation.DELETE ? new PrimaryIndexModificationOperationCallbackFactory(getDatasetId(),
+                            primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
+                            index.resourceType()) : NoOpOperationCallbackFactory.INSTANCE;
         } else {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT
                     ? new SecondaryIndexModificationOperationCallbackFactory(getDatasetId(), primaryKeyFields,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index 3159030..4084824 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -140,6 +140,11 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> {
         return !isPrimaryIndex();
     }
 
+    public boolean isPrimaryKeyIndex() {
+        // a primary key index has no key field names
+        return keyFieldNames.isEmpty();
+    }
+
     public static Pair<IAType, Boolean> getNonNullableType(IAType keyType) {
         boolean nullable = false;
         IAType actualKeyType = keyType;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
new file mode 100644
index 0000000..42b8f29
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+
+public class LSMPrimaryInsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IIndexDataflowHelperFactory keyIndexHelperFactory;
+    private final ISearchOperationCallbackFactory searchOpCallbackFactory;
+    private final int numOfPrimaryKeys;
+    private final int[] filterFields;
+
+    public LSMPrimaryInsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+            int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
+            IIndexDataflowHelperFactory keyIndexHelperFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchOpCallbackFactory, int numOfPrimaryKeys, int[] filterFields) {
+        super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
+                modificationOpCallbackFactory);
+        this.keyIndexHelperFactory = keyIndexHelperFactory;
+        this.searchOpCallbackFactory = searchOpCallbackFactory;
+        this.numOfPrimaryKeys = numOfPrimaryKeys;
+        this.filterFields = filterFields;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+        return new LSMPrimaryInsertOperatorNodePushable(ctx, partition, indexHelperFactory, keyIndexHelperFactory,
+                fieldPermutation, intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numOfPrimaryKeys,
+                filterFields, sourceLoc);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
new file mode 100644
index 0000000..92538b5
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
+import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
+import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+
+    private final IIndexDataflowHelper keyIndexHelper;
+    private MultiComparator keySearchCmp;
+    private RangePredicate searchPred;
+    private IIndexCursor cursor;
+    private LockThenSearchOperationCallback searchCallback;
+    private final ISearchOperationCallbackFactory searchCallbackFactory;
+    private final IFrameTupleProcessor processor;
+    private LSMTreeIndexAccessor lsmAccessor;
+    private LSMTreeIndexAccessor lsmAccessorForKeyIndex;
+    private LSMTreeIndexAccessor lsmAccessorForUniqunessCheck;
+
+    private final IFrameOperationCallback frameOpCallback;
+    private boolean flushedPartialTuples;
+    private int currentTupleIdx;
+    private int lastFlushedTupleIdx;
+
+    private final PermutingFrameTupleReference keyTuple;
+
+    public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
+            IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
+            int[] fieldPermutation, RecordDescriptor inputRecDesc,
+            IModificationOperationCallbackFactory modCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
+                modCallbackFactory, null);
+        if (keyIndexHelperFactory != null) {
+            this.keyIndexHelper = keyIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        } else {
+            this.keyIndexHelper = null;
+        }
+        this.searchCallbackFactory = searchCallbackFactory;
+        int numFilterFieds = filterFields != null ? filterFields.length : 0;
+        int[] searchKeyPermutations = new int[numOfPrimaryKeys + numFilterFieds];
+        for (int i = 0; i < numOfPrimaryKeys; i++) {
+            searchKeyPermutations[i] = fieldPermutation[i];
+        }
+        if (filterFields != null) {
+            for (int i = numOfPrimaryKeys; i < searchKeyPermutations.length; i++) {
+                searchKeyPermutations[i] = filterFields[i - numOfPrimaryKeys];
+            }
+        }
+        keyTuple = new PermutingFrameTupleReference(searchKeyPermutations);
+        processor = new IFrameTupleProcessor() {
+            @Override
+            public void process(ITupleReference tuple, int index) throws HyracksDataException {
+                if (index < currentTupleIdx) {
+                    // already processed; skip
+                    return;
+                }
+                keyTuple.reset(accessor, index);
+                searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
+                boolean duplicate = false;
+
+                lsmAccessorForUniqunessCheck.search(cursor, searchPred);
+                try {
+                    if (cursor.hasNext()) {
+                        // duplicate, skip
+                        searchCallback.release();
+                        duplicate = true;
+                    }
+                } finally {
+                    cursor.close();
+                }
+                if (!duplicate) {
+                    lsmAccessor.forceUpsert(tuple);
+                    if (lsmAccessorForKeyIndex != null) {
+                        lsmAccessorForKeyIndex.forceUpsert(keyTuple);
+                    }
+                } else {
+                    // we should flush previous inserted records so that these transactions can commit
+                    flushPartialFrame();
+                    // feed requires this nested exception to remove duplicated tuples
+                    // TODO: a better way to treat duplicates?
+                    throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
+                            HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+                }
+                currentTupleIdx = index + 1;
+            }
+
+            @Override
+            public void start() throws HyracksDataException {
+                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
+            }
+
+            @Override
+            public void finish() throws HyracksDataException {
+                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
+            }
+
+            @Override
+            public void fail(Throwable th) {
+                // no op
+            }
+        };
+
+        frameOpCallback = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, lsmAccessor);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        currentTupleIdx = 0;
+        lastFlushedTupleIdx = 0;
+        flushedPartialTuples = false;
+        accessor = new FrameTupleAccessor(inputRecDesc);
+        writeBuffer = new VSizeFrame(ctx);
+        writer.open();
+        indexHelper.open();
+        index = indexHelper.getIndexInstance();
+        IIndex indexForUniquessCheck;
+        if (keyIndexHelper != null) {
+            keyIndexHelper.open();
+            indexForUniquessCheck = keyIndexHelper.getIndexInstance();
+        } else {
+            indexForUniquessCheck = index;
+        }
+        try {
+            if (ctx.getSharedObject() != null) {
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
+                TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+            }
+            keySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(((ITreeIndex) index).getComparatorFactories(), frameTuple);
+            searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
+            appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+            modCallback =
+                    modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
+            searchCallback = (LockThenSearchOperationCallback) searchCallbackFactory
+                    .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
+            IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(iap);
+            lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
+            if (keyIndexHelper != null) {
+                lsmAccessorForKeyIndex = (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iap);
+            }
+
+            IIndexAccessParameters iapForUniquenessCheck =
+                    new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallback);
+            lsmAccessorForUniqunessCheck =
+                    (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
+
+            cursor = lsmAccessorForUniqunessCheck.createSearchCursor(false);
+            frameTuple = new FrameTupleReference();
+            INcApplicationContext appCtx =
+                    (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
+                    appCtx.getTransactionSubsystem().getLogManager());
+        } catch (Throwable e) { // NOSONAR: Re-thrown
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+
+        writeBuffer.ensureFrameSize(buffer.capacity());
+        if (flushedPartialTuples) {
+            flushPartialFrame();
+        } else {
+            FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+            FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+        }
+        currentTupleIdx = 0;
+        lastFlushedTupleIdx = 0;
+        flushedPartialTuples = false;
+    }
+
+    /**
+     * flushes tuples in a frame from lastFlushedTupleIdx(inclusive) to currentTupleIdx(exclusive)
+     */
+    @Override
+    public void flushPartialFrame() throws HyracksDataException {
+        if (lastFlushedTupleIdx == currentTupleIdx) {
+            //nothing to flush
+            return;
+        }
+        for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) {
+            FrameUtils.appendToWriter(writer, appender, accessor, i);
+        }
+        appender.write(writer, true);
+        lastFlushedTupleIdx = currentTupleIdx;
+        flushedPartialTuples = true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        Throwable failure = CleanupUtils.destroy(null, cursor);
+        failure = CleanupUtils.close(writer, failure);
+        failure = CleanupUtils.close(indexHelper, failure);
+        failure = CleanupUtils.close(keyIndexHelper, failure);
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // No op since nextFrame flushes by default
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index fe17b39..f663787 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -21,18 +21,13 @@ package org.apache.asterix.transaction.management.opcallbacks;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.LogSource;
-import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFrameWriter;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
 public class LockThenSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
@@ -40,23 +35,14 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
     /**
      * variables used for deadlock-free locking protocol
      */
-    private final LSMIndexInsertUpdateDeleteOperatorNodePushable operatorNodePushable;
-    private final ILogManager logManager;
-    private final ILogRecord logRecord;
+    private final ILSMIndexFrameWriter operatorNodePushable;
     private int pkHash;
 
     public LockThenSearchOperationCallback(DatasetId datasetId, long resourceId, int[] entityIdFields,
             ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx,
             IOperatorNodePushable operatorNodePushable) {
         super(datasetId, resourceId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
-        this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
-        this.logManager = txnSubsystem.getLogManager();
-        this.logRecord = new LogRecord();
-        logRecord.setTxnCtx(txnCtx);
-        logRecord.setLogSource(LogSource.LOCAL);
-        logRecord.setLogType(LogType.WAIT);
-        logRecord.setTxnId(txnCtx.getTxnId().getId());
-        logRecord.computeAndSetLogSize();
+        this.operatorNodePushable = (ILSMIndexFrameWriter) operatorNodePushable;
     }
 
     @Override
@@ -83,19 +69,19 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
             if (operatorNodePushable != null) {
 
                 /**********************************************************************************
-                 * In order to achieve deadlock-free locking protocol during any write (insert/delete/upsert) operations,
-                 * the following logic is implemented.
-                 * See https://cwiki.apache.org/confluence/display/ASTERIXDB/Deadlock-Free+Locking+Protocol for more details.
+                 * In order to achieve deadlock-free locking protocol during any write (insert/delete/upsert)
+                 * operations, the following logic is implemented.
+                 * See https://cwiki.apache.org/confluence/display/ASTERIXDB/Deadlock-Free+Locking+Protocol for
+                 * : more details.
                  * 1. for each entry in a frame
                  * 2. returnValue = tryLock() for an entry
                  * 3. if returnValue == false
                  * 3-1. flush all entries (which already acquired locks) to the next operator
-                 * : this will make all those entries reach commit operator so that corresponding commit logs will be created.
-                 * 3-2. create a WAIT log and wait until logFlusher thread will flush the WAIT log and gives notification
-                 * : this notification guarantees that all locks acquired by this transactor (or all locks acquired for the entries)
-                 * were released.
-                 * 3-3. acquire lock using lock() instead of tryLock() for the failed entry
-                 * : we know for sure this lock call will not cause deadlock since the transactor doesn't hold any other locks.
+                 * : this will make all those entries reach commit operator so that corresponding commit logs will
+                 * : be created.
+                 * 3-2. acquire lock using lock() instead of tryLock() for the failed entry
+                 * : we know for sure this lock call will not cause deadlock since the transactor doesn't hold any
+                 * : other locks.
                  * 4. create an update log and insert the entry
                  * From the above logic, step 2 and 3 are implemented in this before() method.
                  **********************/
@@ -106,9 +92,6 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
                     //flush entries which have been inserted already to release locks hold by them
                     operatorNodePushable.flushPartialFrame();
 
-                    //create WAIT log and wait until the WAIT log is flushed and notified by LogFlusher thread
-                    logWait();
-
                     //acquire lock
                     lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
                 }
@@ -122,10 +105,6 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
         }
     }
 
-    private void logWait() throws ACIDException {
-        logManager.log(logRecord);
-    }
-
     public void release() throws ACIDException {
         lockManager.unlock(datasetId, pkHash, LockMode.X, txnCtx);
     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 82da495..88d054b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -19,7 +19,6 @@
 
 package org.apache.asterix.transaction.management.opcallbacks;
 
-import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
@@ -29,6 +28,7 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFrameWriter;
 
 /**
  * Assumes LSM-BTrees as primary indexes.
@@ -36,14 +36,14 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
  */
 public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback {
 
-    private final LSMInsertDeleteOperatorNodePushable operatorNodePushable;
+    private final ILSMIndexFrameWriter operatorNodePushable;
 
     public PrimaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
             int resourcePartition, byte resourceType, Operation indexOp, IOperatorNodePushable operatorNodePushable) {
         super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
                 resourceType, indexOp);
-        this.operatorNodePushable = (LSMInsertDeleteOperatorNodePushable) operatorNodePushable;
+        this.operatorNodePushable = (ILSMIndexFrameWriter) operatorNodePushable;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 5332b1e..061c0ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -83,7 +83,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
 
     private final boolean needKeyDupCheck;
 
-    // Primary LSMBTree has a Bloomfilter, but Secondary one doesn't have.
+    // Primary and Primary Key LSMBTree has a Bloomfilter, but Secondary one doesn't have.
     private final boolean hasBloomFilter;
 
     public LSMBTree(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
@@ -94,8 +94,9 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
             double bloomFilterFalsePositiveRate, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, int[] btreeFields,
-            int[] filterFields, boolean durable, boolean updateAware, ITracer tracer) throws HyracksDataException {
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, boolean hasBloomFilter,
+            int[] btreeFields, int[] filterFields, boolean durable, boolean updateAware, ITracer tracer)
+            throws HyracksDataException {
         super(ioManager, virtualBufferCaches, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy,
                 opTracker, ioScheduler, ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory,
                 filterFrameFactory, filterManager, filterFields, durable, filterHelper, btreeFields, tracer);
@@ -114,7 +115,7 @@ public class LSMBTree extends AbstractLSMIndex implements ITreeIndex {
             ++i;
         }
         this.needKeyDupCheck = needKeyDupCheck;
-        this.hasBloomFilter = needKeyDupCheck;
+        this.hasBloomFilter = hasBloomFilter;
     }
 
     // Without memory components
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index 70ac3f6..5aaf0e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -107,13 +107,13 @@ public class LSMBTreeUtil {
             filterManager = new LSMComponentFilterManager(filterFrameFactory);
         }
 
+        boolean hasBloomFilter = bloomFilterKeyFields != null;
         ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, diskBTreeFactory,
-                needKeyDupCheck, compressorDecompressorFactory);
+                hasBloomFilter, compressorDecompressorFactory);
 
-        //Primary LSMBTree index has a BloomFilter.
         ILSMDiskComponentFactory componentFactory;
         ILSMDiskComponentFactory bulkLoadComponentFactory;
-        if (needKeyDupCheck) {
+        if (hasBloomFilter) {
             BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, bloomFilterKeyFields);
             componentFactory =
                     new LSMBTreeWithBloomFilterDiskComponentFactory(diskBTreeFactory, bloomFilterFactory, filterHelper);
@@ -127,8 +127,8 @@ public class LSMBTreeUtil {
         return new LSMBTree(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory,
                 deleteLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bulkLoadComponentFactory,
                 filterHelper, filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, typeTraits.length,
-                cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, needKeyDupCheck, btreeFields,
-                filterFields, durable, updateAware, tracer);
+                cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, needKeyDupCheck, hasBloomFilter,
+                btreeFields, filterFields, durable, updateAware, tracer);
     }
 
     public static ExternalBTree createExternalBTree(IIOManager ioManager, FileReference file,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index dc54e20..65d2fcf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -427,9 +427,13 @@ public class LSMHarness implements ILSMHarness {
             throws HyracksDataException {
         LSMOperationType opType = LSMOperationType.SEARCH;
         ctx.setSearchPredicate(pred);
+        // lock should be acquired before entering LSM components.
+        // Otherwise, if a writer activates a new memory component, its updates may be ignored by subsequent readers
+        // if the readers have entered components first. However, based on the order of acquiring locks,
+        // the updates made by the writer should be seen by these subsequent readers.
+        ctx.getSearchOperationCallback().before(pred.getLowKey());
         getAndEnterComponents(ctx, opType, false);
         try {
-            ctx.getSearchOperationCallback().before(pred.getLowKey());
             lsmIndex.search(ctx, cursor, pred);
         } catch (Exception e) {
             exitComponents(ctx, opType, null, true);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 4867c71..519f895 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -86,13 +86,14 @@ public class TestLsmBtree extends LSMBTree {
             ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
             double bloomFilterFalsePositiveRate, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackFactory ioOperationCallbackFactory, boolean needKeyDupCheck, int[] btreeFields,
-            int[] filterFields, boolean durable, boolean updateAware, ITracer tracer) throws HyracksDataException {
+            ILSMIOOperationCallbackFactory ioOperationCallbackFactory, boolean needKeyDupCheck, boolean hasBloomFilter,
+            int[] btreeFields, int[] filterFields, boolean durable, boolean updateAware, ITracer tracer)
+            throws HyracksDataException {
         super(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory, deleteLeafFrameFactory,
                 diskBufferCache, fileManager, componentFactory, bulkLoadComponentFactory, filterHelper,
                 filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
-                opTracker, ioScheduler, ioOperationCallbackFactory, needKeyDupCheck, btreeFields, filterFields, durable,
-                updateAware, tracer);
+                opTracker, ioScheduler, ioOperationCallbackFactory, needKeyDupCheck, hasBloomFilter, btreeFields,
+                filterFields, durable, updateAware, tracer);
 
         addModifyCallback(AllowTestOpCallback.INSTANCE);
         addSearchCallback(AllowTestOpCallback.INSTANCE);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeUtil.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeUtil.java
index d614aff..82f7959 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeUtil.java
@@ -93,13 +93,15 @@ public class TestLsmBtreeUtil {
             filterManager = new LSMComponentFilterManager(filterFrameFactory);
         }
 
-        //Primary LSMBTree index has a BloomFilter.
+        boolean hasBloomFilter = bloomFilterKeyFields != null;
+        //Primary and Primary Key LSMBTree index has a BloomFilter.
         ILSMIndexFileManager fileNameManager =
-                new LSMBTreeFileManager(ioManager, file, diskBTreeFactory, needKeyDupCheck);
+                new LSMBTreeFileManager(ioManager, file, diskBTreeFactory, hasBloomFilter);
 
         ILSMDiskComponentFactory componentFactory;
         ILSMDiskComponentFactory bulkLoadComponentFactory;
-        if (needKeyDupCheck) {
+
+        if (hasBloomFilter) {
             BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, bloomFilterKeyFields);
             componentFactory =
                     new LSMBTreeWithBloomFilterDiskComponentFactory(diskBTreeFactory, bloomFilterFactory, filterHelper);
@@ -113,7 +115,7 @@ public class TestLsmBtreeUtil {
         return new TestLsmBtree(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory,
                 deleteLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bulkLoadComponentFactory,
                 filterHelper, filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, typeTraits.length,
-                cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, needKeyDupCheck, btreeFields,
-                filterFields, durable, updateAware, tracer);
+                cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, needKeyDupCheck, hasBloomFilter,
+                btreeFields, filterFields, durable, updateAware, tracer);
     }
 }