You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2023/04/06 01:27:52 UTC

[asterixdb] branch master updated: [ASTERIXDB-3144][RT] Pass partitions map to inverted index

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba2d8bd2fc [ASTERIXDB-3144][RT] Pass partitions map to inverted index
ba2d8bd2fc is described below

commit ba2d8bd2fcaf1992177c78dfb326c9ccebd23531
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Wed Apr 5 04:42:09 2023 -0700

    [ASTERIXDB-3144][RT] Pass partitions map to inverted index
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Pass partitions map to the inverted index runtime.
    - rename few methods.
    
    Change-Id: I6ad1b0cd79f0f5e8e15da83330b8a52f9ac0108d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17463
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../operators/physical/BTreeSearchPOperator.java   |   2 +-
 .../operators/physical/InvertedIndexPOperator.java |  19 +-
 .../operators/physical/RTreeSearchPOperator.java   |   2 +-
 .../asterix/app/function/QueryIndexDatasource.java |   6 +-
 .../org/apache/asterix/utils/FeedOperations.java   |   2 +-
 .../metadata/declared/DatasetDataSource.java       |   4 +-
 .../metadata/declared/FunctionDataSource.java      |   4 +-
 .../metadata/declared/LoadableDataSource.java      |   2 +-
 .../metadata/declared/MetadataProvider.java        | 237 ++++++++++-----------
 .../metadata/declared/SampleDataSource.java        |   2 +-
 .../core/algebra/metadata/IMetadataProvider.java   |  31 +--
 .../dataflow/BTreeSearchOperatorDescriptor.java    |   8 +-
 .../dataflow/BTreeSearchOperatorNodePushable.java  |   4 +-
 .../dataflow/IndexSearchOperatorNodePushable.java  |   4 +-
 ...LSMBTreeBatchPointSearchOperatorDescriptor.java |   6 +-
 ...MBTreeBatchPointSearchOperatorNodePushable.java |   4 +-
 .../LSMInvertedIndexSearchOperatorDescriptor.java  |  10 +-
 ...LSMInvertedIndexSearchOperatorNodePushable.java |   6 +-
 18 files changed, 178 insertions(+), 175 deletions(-)

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index a7a383821b..b8eba74b00 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -162,7 +162,7 @@ public class BTreeSearchPOperator extends IndexSearchPOperator {
                         String.valueOf(unnestMap.getOperatorTag()));
         }
 
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.getBtreeSearchRuntime(
                 builder.getJobSpec(), opSchema, typeEnv, context, jobGenParams.getRetainInput(), retainMissing,
                 nonMatchWriterFactory, dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
                 jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive(), propagateFilter,
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 00eef690e1..5bdb2dba5a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -186,14 +186,17 @@ public class InvertedIndexPOperator extends IndexSearchPOperator {
         IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), secondarySplitsAndConstraint.first);
 
-        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
-                jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory,
-                fullTextConfigEvaluatorFactory, searchModifierFactory, retainInput, retainMissing,
-                nonMatchWriterFactory,
-                dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
-                        IndexOperation.SEARCH, null),
-                minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
-                propagateIndexFilter, nonFilterWriterFactory, frameLimit);
+        int numPartitions = MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second);
+        int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+
+        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
+                new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, outputRecDesc, queryField, dataflowHelperFactory,
+                        queryTokenizerFactory, fullTextConfigEvaluatorFactory, searchModifierFactory, retainInput,
+                        retainMissing, nonMatchWriterFactory,
+                        dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
+                                IndexOperation.SEARCH, null),
+                        minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
+                        propagateIndexFilter, nonFilterWriterFactory, frameLimit, partitionsMap);
         return new Pair<>(invIndexSearchOp, secondarySplitsAndConstraint.second);
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index 6534ebe89b..6b5adea536 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -106,7 +106,7 @@ public class RTreeSearchPOperator extends IndexSearchPOperator {
         }
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch =
-                mp.buildRtreeRuntime(builder.getJobSpec(), outputVars, opSchema, typeEnv, context,
+                mp.getRtreeSearchRuntime(builder.getJobSpec(), outputVars, opSchema, typeEnv, context,
                         jobGenParams.getRetainInput(), retainMissing, nonMatchWriterFactory, dataset,
                         jobGenParams.getIndexName(), keyIndexes, propagateIndexFilter, nonFilterWriterFactory,
                         minFilterFieldIndexes, maxFilterFieldIndexes, unnestMap.getGenerateCallBackProceedResultVar());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index f962142758..fda3845b7b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -105,9 +105,9 @@ public class QueryIndexDatasource extends FunctionDataSource {
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
             IProjectionFiltrationInfo<?> projectionInfo, IProjectionFiltrationInfo<?> metaProjectionInfo)
             throws AlgebricksException {
-        return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, ds, indexName,
-                null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit, false, false,
-                DefaultTupleProjectorFactory.INSTANCE, false);
+        return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, ds,
+                indexName, null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit, false,
+                false, DefaultTupleProjectorFactory.INSTANCE, false);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dcd52a0732..07500f710f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -142,7 +142,7 @@ public class FeedOperations {
         IOperatorDescriptor feedIngestor;
         AlgebricksPartitionConstraint ingesterPc;
         Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> t =
-                metadataProvider.buildFeedIntakeRuntime(spec, feed, policyAccessor);
+                metadataProvider.getFeedIntakeRuntime(spec, feed, policyAccessor);
         feedIngestor = t.first;
         ingesterPc = t.second;
         adapterFactory = t.third;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index c77e032f87..89e99fda08 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -139,7 +139,7 @@ public class DatasetDataSource extends DataSource {
                 properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize));
                 ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
                         edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector());
-                return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+                return metadataProvider.getExternalDatasetScanRuntime(jobSpec, itemType, adapterFactory,
                         tupleFilterFactory, outputLimit);
             case INTERNAL:
                 DataSourceId id = getId();
@@ -163,7 +163,7 @@ public class DatasetDataSource extends DataSource {
 
                 int[] minFilterFieldIndexes = createFilterIndexes(minFilterVars, opSchema);
                 int[] maxFilterFieldIndexes = createFilterIndexes(maxFilterVars, opSchema);
-                return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null,
+                return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null,
                         ((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true,
                         true, false, null, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory,
                         outputLimit, false, false, tupleProjectorFactory, false);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 58377f449c..9f7d567410 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -115,8 +115,8 @@ public abstract class FunctionDataSource extends DataSource {
         dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         dataParserFactory.configure(Collections.emptyMap());
         adapterFactory.configure(factory, dataParserFactory);
-        return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
-                tupleFilterFactory, outputLimit);
+        return metadataProvider.getExternalDatasetScanRuntime(jobSpec, itemType, adapterFactory, tupleFilterFactory,
+                outputLimit);
     }
 
     protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index c7ccc531d9..fc65c4b7af 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -147,7 +147,7 @@ public class LoadableDataSource extends DataSource {
         ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
                 alds.getAdapter(), alds.getAdapterProperties(), itemType, null, context.getWarningCollector());
         RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-        return metadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc);
+        return metadataProvider.getLoadableDatasetScanRuntime(jobSpec, adapterFactory, rDesc);
     }
 
     @Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index debf60efb4..486238959f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -188,7 +188,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     private ResultSetId resultSetId;
     private Counter resultSetIdCounter;
     private TxnId txnId;
-    private Map<String, Integer> externalDataLocks;
     private boolean blockingOperatorDisabled = false;
 
     public static MetadataProvider create(ICcApplicationContext appCtx, Dataverse defaultDataverse) {
@@ -326,14 +325,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return storageProperties;
     }
 
-    public Map<String, Integer> getExternalDataLocks() {
-        return externalDataLocks;
-    }
-
-    public void setExternalDataLocks(Map<String, Integer> locks) {
-        this.externalDataLocks = locks;
-    }
-
     private DataverseName getActiveDataverseName(DataverseName dataverseName) {
         return dataverseName != null ? dataverseName
                 : defaultDataverse != null ? defaultDataverse.getDataverseName() : null;
@@ -496,7 +487,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 context, jobSpec, implConfig, projectionInfo, metaProjectionInfo);
     }
 
-    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getLoadableDatasetScanRuntime(
             JobSpecification jobSpec, ITypedAdapterFactory adapterFactory, RecordDescriptor rDesc)
             throws AlgebricksException {
         ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
@@ -511,7 +502,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
     }
 
-    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> buildFeedIntakeRuntime(
+    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> getFeedIntakeRuntime(
             JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception {
         Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
         factoryOutput =
@@ -539,7 +530,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreeSearchRuntime(JobSpecification jobSpec,
             IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
             boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -633,23 +624,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return new Pair<>(btreeSearchOp, spPc.second);
     }
 
-    public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
-        if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
-            return ((AlgebricksCountPartitionConstraint) constraint).getCount();
-        } else {
-            return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length;
-        }
-    }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRtreeSearchRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, boolean retainMissing,
             IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName, int[] keyFields,
@@ -808,6 +783,53 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 additionalNonKeyFields, inputRecordDesc, context, spec, false, additionalNonFilteringFields);
     }
 
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
+            IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
+            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        DataverseName dataverseName = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName);
+        }
+        int numKeys = primaryKeys.size();
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
+        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
+        // Move key fields to front. [keys, record, filters]
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
+        int[] bloomFilterKeyFields = new int[numKeys];
+        int i = 0;
+        // set the keys' permutations
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = inputSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        // set the record permutation
+        fieldPermutation[i++] = inputSchema.findVariable(payload);
+
+        // set the meta record permutation
+        if (additionalNonFilterFields != null) {
+            for (LogicalVariable var : additionalNonFilterFields) {
+                int idx = inputSchema.findVariable(var);
+                fieldPermutation[i++] = idx;
+            }
+        }
+
+        // set the filters' permutations.
+        if (numFilterFields > 0) {
+            int idx = inputSchema.findVariable(filterKeys.get(0));
+            fieldPermutation[i++] = idx;
+        }
+
+        return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
+                context.getMissingWriterFactory());
+    }
+
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
             IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -816,9 +838,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
             boolean bulkload, List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
             throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc,
-                context, spec, bulkload, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
+        return getIndexModificationRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc, context,
+                spec, bulkload, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
     }
 
     @Override
@@ -829,9 +851,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
             List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
             throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc,
-                context, spec, false, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
+        return getIndexModificationRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc, context,
+                spec, false, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
     }
 
     @Override
@@ -843,9 +865,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
             List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevFilterExpr,
-                recordDesc, context, spec, false, operationVar, prevSecondaryKeys, prevAdditionalFilteringKey,
+        return getIndexModificationRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevFilterExpr, recordDesc,
+                context, spec, false, operationVar, prevSecondaryKeys, prevAdditionalFilteringKey,
                 secondaryKeysPipelines, null);
     }
 
@@ -969,53 +991,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return null;
     }
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
-            IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
-            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        DataverseName dataverseName = dataSource.getId().getDataverseName();
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName);
-        }
-        int numKeys = primaryKeys.size();
-        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
-        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
-        // Move key fields to front. [keys, record, filters]
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
-        int[] bloomFilterKeyFields = new int[numKeys];
-        int i = 0;
-        // set the keys' permutations
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = inputSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        // set the record permutation
-        fieldPermutation[i++] = inputSchema.findVariable(payload);
-
-        // set the meta record permutation
-        if (additionalNonFilterFields != null) {
-            for (LogicalVariable var : additionalNonFilterFields) {
-                int idx = inputSchema.findVariable(var);
-                fieldPermutation[i++] = idx;
-            }
-        }
-
-        // set the filters' permutations.
-        if (numFilterFields > 0) {
-            int idx = inputSchema.findVariable(filterKeys.get(0));
-            fieldPermutation[i++] = idx;
-        }
-
-        return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
-                context.getMissingWriterFactory());
-    }
-
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(JobSpecification spec,
             MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
             int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
@@ -1024,7 +999,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 missingWriterFactory);
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDatasetScanRuntime(
             JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory,
             ITupleFilterFactory tupleFilterFactory, long outputLimit) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1162,13 +1137,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 tupleFilterFactory, isPrimary, modCallbackFactory, tuplePartitionerFactory, partitionsMap);
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
-            IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> dataSourceIndex,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr,
-            ILogicalExpression prevFilterExpr, RecordDescriptor inputRecordDesc, JobGenContext context,
-            JobSpecification spec, boolean bulkload, LogicalVariable operationVar,
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexModificationRuntime(IndexOperation indexOp,
+            IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            ILogicalExpression filterExpr, ILogicalExpression prevFilterExpr, RecordDescriptor inputRecordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload, LogicalVariable operationVar,
             List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
             List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
             throws AlgebricksException {
@@ -1202,33 +1176,33 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
         switch (secondaryIndex.getIndexType()) {
             case BTREE:
-                return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+                return getBTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
                         context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
             case ARRAY:
                 if (bulkload) {
                     // In the case of bulk-load, we do not handle any nested plans. We perform the exact same behavior
                     // as a normal B-Tree bulk load.
-                    return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                            secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
-                            context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys,
+                    return getBTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
+                            primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory,
+                            inputRecordDesc, context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys,
                             prevAdditionalFilteringKeys);
                 } else {
-                    return getArrayIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                            additionalNonKeyFields, inputRecordDesc, spec, indexOp, operationVar,
+                    return getArrayIndexModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
+                            primaryKeys, additionalNonKeyFields, inputRecordDesc, spec, indexOp, operationVar,
                             secondaryKeysPipelines);
                 }
             case RTREE:
-                return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+                return getRTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
                         context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX:
-                return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
-                        context, spec, indexOp, secondaryIndex.getIndexType(), bulkload, operationVar,
+                return getInvertedIndexModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
+                        primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory,
+                        inputRecordDesc, context, spec, indexOp, secondaryIndex.getIndexType(), bulkload, operationVar,
                         prevSecondaryKeys, prevAdditionalFilteringKeys);
             default:
                 throw new AlgebricksException(
@@ -1236,13 +1210,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         }
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeRuntime(DataverseName dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory prevFilterFactory,
-            RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            boolean bulkload, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
-            List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeModificationRuntime(
+            DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
+            AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor inputRecordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+            throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         int numKeys = primaryKeys.size() + secondaryKeys.size();
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
@@ -1332,10 +1307,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         }
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getArrayIndexRuntime(DataverseName dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, RecordDescriptor inputRecordDesc, JobSpecification spec,
-            IndexOperation indexOp, LogicalVariable operationVar, List<List<AlgebricksPipeline>> secondaryKeysPipelines)
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getArrayIndexModificationRuntime(
+            DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            RecordDescriptor inputRecordDesc, JobSpecification spec, IndexOperation indexOp,
+            LogicalVariable operationVar, List<List<AlgebricksPipeline>> secondaryKeysPipelines)
             throws AlgebricksException {
 
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
@@ -1397,13 +1373,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         }
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeRuntime(DataverseName dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory prevFilterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            boolean bulkload, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
-            List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeModificationRuntime(
+            DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
+            AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+            throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType = MetadataManager.INSTANCE
@@ -1506,7 +1483,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexRuntime(
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexModificationRuntime(
             DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
             List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
@@ -1905,6 +1882,22 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         validateDatabaseObjectNameImpl(objectName, sourceLoc);
     }
 
+    public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
+        if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+            return ((AlgebricksCountPartitionConstraint) constraint).getCount();
+        } else {
+            return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length;
+        }
+    }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
+
     private void validateDatabaseObjectNameImpl(String name, SourceLocation sourceLoc) throws AlgebricksException {
         if (name == null || name.isEmpty()) {
             throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, "");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 708c2c2047..448f5ce522 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -62,7 +62,7 @@ public class SampleDataSource extends DataSource {
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
             IProjectionFiltrationInfo<?> projectionInfo, IProjectionFiltrationInfo<?> metaProjectionInfo)
             throws AlgebricksException {
-        return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, dataset,
+        return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context, true, false, null, dataset,
                 sampleIndexName, null, null, true, true, false, null, null, null, tupleFilterFactory, outputLimit,
                 false, false, DefaultTupleProjectorFactory.INSTANCE, false);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4596393b25..4a6e76e16b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -43,7 +43,6 @@ import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 
 public interface IMetadataProvider<S, I> {
-    IDataSource<S> findDataSource(S id) throws AlgebricksException;
 
     /**
      * Obs: A scanner may choose to contribute a null
@@ -78,6 +77,12 @@ public interface IMetadataProvider<S, I> {
             List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context,
             JobSpecification jobSpec, boolean bulkload) throws AlgebricksException;
 
+    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
+            IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
+            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification jobSpec) throws AlgebricksException;
+
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields,
@@ -115,6 +120,14 @@ public interface IMetadataProvider<S, I> {
             List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
             throws AlgebricksException;
 
+    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
+            ILogicalExpression prevFilterExpr, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+            JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
+
     /**
      * Creates the delete runtime of IndexInsertDeletePOperator, which models
      * insert/delete operations into a secondary index.
@@ -170,24 +183,12 @@ public interface IMetadataProvider<S, I> {
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
             throws AlgebricksException;
 
+    IDataSource<S> findDataSource(S id) throws AlgebricksException;
+
     IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
 
     IFunctionInfo lookupFunction(FunctionIdentifier fid);
 
-    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
-            IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
-            LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
-            List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification jobSpec) throws AlgebricksException;
-
-    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
-            IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
-            ILogicalExpression prevFilterExpr, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
-            JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
-
     ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
             ILogicalExpression filterExpr, JobGenContext context) throws AlgebricksException;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 1c961c5c68..c46391f5c6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -57,7 +57,7 @@ public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato
     protected final long outputLimit;
     protected final ITupleProjectorFactory tupleProjectorFactory;
     protected final ITuplePartitionerFactory tuplePartitionerFactory;
-    protected final int[][] map;
+    protected final int[][] partitionsMap;
 
     public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -79,7 +79,7 @@ public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
             byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.retainInput = retainInput;
@@ -102,7 +102,7 @@ public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato
         this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
         this.tupleProjectorFactory = tupleProjectorFactory;
         this.tuplePartitionerFactory = tuplePartitionerFactory;
-        this.map = map;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -114,7 +114,7 @@ public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
                 searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory,
-                tuplePartitionerFactory, map);
+                tuplePartitionerFactory, partitionsMap);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 24163ea9d9..3fd0cf96a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -68,12 +68,12 @@ public class BTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePush
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
             byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
                 searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, projectorFactory,
-                tuplePartitionerFactory, map);
+                tuplePartitionerFactory, partitionsMap);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
         if (lowKeyFields != null && lowKeyFields.length > 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index c6cdd663b1..88c06eb91a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -128,10 +128,10 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
             boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
             byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         this.ctx = ctx;
         this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-        this.partitions = map != null ? map[partition] : new int[] { partition };
+        this.partitions = partitionsMap != null ? partitionsMap[partition] : new int[] { partition };
         for (int i = 0; i < partitions.length; i++) {
             storagePartitionId2Index.put(partitions[i], i);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
index 9ed0782a89..4d8f1fe7af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -41,11 +41,11 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor extends BTreeSearchOpera
             IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory,
             long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
         super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
                 maxFilterFieldIndexes, false, null, tupleFilterFactory, outputLimit, false, null, null,
-                tupleProjectorFactory, tuplePartitionerFactory, map);
+                tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
     }
 
     @Override
@@ -55,7 +55,7 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor extends BTreeSearchOpera
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
                 lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, tupleFilterFactory,
-                outputLimit, tupleProjectorFactory, tuplePartitionerFactory, map);
+                outputLimit, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 47d515a3e3..8c8a550627 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -54,11 +54,11 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable extends BTreeSearchOpe
             IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
             ITupleFilterFactory tupleFilterFactory, long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) throws HyracksDataException {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
                 minFilterKeyFields, maxFilterKeyFields, indexHelperFactory, retainInput, retainMissing,
                 missingWriterFactory, searchCallbackFactory, false, null, tupleFilterFactory, outputLimit, false, null,
-                null, tupleProjectorFactory, tuplePartitionerFactory, map);
+                null, tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
         this.keyFields = lowKeyFields;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
index b5b951da0e..fcdf792c23 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigE
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 
 public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final int queryField;
     private final IInvertedIndexSearchModifierFactory searchModifierFactory;
@@ -54,6 +54,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActi
     private final int numOfFields;
     // the maximum number of frames that this inverted-index-search can use
     private final int frameLimit;
+    private final int[][] partitionsMap;
 
     public LSMInvertedIndexSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int queryField, IIndexDataflowHelperFactory indexHelperFactory,
@@ -62,7 +63,8 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActi
             IInvertedIndexSearchModifierFactory searchModifierFactory, boolean retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean isFullTextSearchQuery, int numOfFields,
-            boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory, int frameLimit) {
+            boolean appendIndexFilter, IMissingWriterFactory nonFilterWriterFactory, int frameLimit,
+            int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.queryTokenizerFactory = queryTokenizerFactory;
@@ -79,6 +81,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActi
         this.appendIndexFilter = appendIndexFilter;
         this.nonFilterWriterFactory = nonFilterWriterFactory;
         this.numOfFields = numOfFields;
+        this.partitionsMap = partitionsMap;
         this.outRecDescs[0] = outRecDesc;
         this.frameLimit = frameLimit;
     }
@@ -91,6 +94,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends AbstractSingleActi
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), partition, minFilterFieldIndexes,
                 maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing, missingWriterFactory,
                 searchCallbackFactory, searchModifier, queryTokenizerFactory, fullTextConfigEvaluatorFactory,
-                queryField, isFullTextSearchQuery, numOfFields, appendIndexFilter, nonFilterWriterFactory, frameLimit);
+                queryField, isFullTextSearchQuery, numOfFields, appendIndexFilter, nonFilterWriterFactory, frameLimit,
+                partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
index 996241daef..742a86c14a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
@@ -65,10 +65,12 @@ public class LSMInvertedIndexSearchOperatorNodePushable extends IndexSearchOpera
             IInvertedIndexSearchModifier searchModifier, IBinaryTokenizerFactory binaryTokenizerFactory,
             IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory, int queryFieldIndex,
             boolean isFullTextSearchQuery, int numOfFields, boolean appendIndexFilter,
-            IMissingWriterFactory nonFilterWriterFactory, int frameLimit) throws HyracksDataException {
+            IMissingWriterFactory nonFilterWriterFactory, int frameLimit, int[][] partitionsMap)
+            throws HyracksDataException {
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
-                nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE, null, null);
+                nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE, null,
+                partitionsMap);
         this.searchModifier = searchModifier;
         this.binaryTokenizerFactory = binaryTokenizerFactory;
         this.fullTextConfigEvaluatorFactory = fullTextConfigEvaluatorFactory;