You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2023/04/24 20:31:33 UTC

[asterixdb] branch master updated: [ASTERIXDB-3144][HYR][RT] Make index bulkload support multiple partitions

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 836209d4e3 [ASTERIXDB-3144][HYR][RT] Make index bulkload support multiple partitions
836209d4e3 is described below

commit 836209d4e3ab63402fe4a4d938f2f2a90a04083a
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Mon Apr 24 01:45:42 2023 -0700

    [ASTERIXDB-3144][HYR][RT] Make index bulkload support multiple partitions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    This patch changes the index bulkload operators to support
    operating on multiple partitions. With this change, an index
    bulkload node pushable will bulkload multiple indexes
    representing multiple partitions. This is a step towards
    achieving compute/storage separation.
    
    Change-Id: I36e242b36ed1ea2472883e8d3c1ec99d1de1c630
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17494
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
 .../asterix/app/bootstrap/TestNodeController.java  | 13 ++-
 .../org/apache/asterix/common/TestDataUtil.java    |  8 --
 .../storage/IndexDropOperatorNodePushableTest.java | 10 +--
 .../metadata/declared/MetadataProvider.java        | 57 +++++++------
 .../apache/asterix/metadata/utils/DatasetUtil.java |  3 +-
 .../metadata/utils/SampleOperationsHelper.java     | 18 ++++-
 .../SecondaryArrayIndexBTreeOperationsHelper.java  |  6 +-
 .../utils/SecondaryBTreeOperationsHelper.java      | 19 ++---
 .../utils/SecondaryIndexOperationsHelper.java      | 40 ++++-----
 .../SecondaryInvertedIndexOperationsHelper.java    | 20 +++--
 .../utils/SecondaryRTreeOperationsHelper.java      |  3 +-
 .../utils/SecondaryTreeIndexOperationsHelper.java  |  8 ++
 .../LSMIndexBulkLoadOperatorDescriptor.java        | 13 ++-
 .../LSMIndexBulkLoadOperatorNodePushable.java      | 43 +++++-----
 .../operators/physical/IntersectPOperator.java     |  8 --
 .../btree/client/InsertPipelineExample.java        | 10 +--
 .../hyracks/examples/btree/client/JobHelper.java   |  8 ++
 .../btree/client/PrimaryIndexBulkLoadExample.java  | 13 ++-
 .../client/SecondaryIndexBulkLoadExample.java      | 15 +++-
 .../tests/am/btree/AbstractBTreeOperatorTest.java  | 30 ++++---
 .../tests/am/rtree/AbstractRTreeOperatorTest.java  | 35 +++++---
 .../tests/integration/AbstractIntegrationTest.java |  8 --
 .../apache/hyracks/tests/integration/TestUtil.java |  8 --
 .../IndexBulkLoadOperatorNodePushable.java         | 94 +++++++++++++++-------
 .../TreeIndexBulkLoadOperatorDescriptor.java       | 22 ++---
 .../org/apache/hyracks/test/support/TestUtils.java |  8 ++
 26 files changed, 314 insertions(+), 206 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 068c0ee18f..3ddbfd97a3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -213,10 +213,15 @@ public class TestNodeController {
             for (int i = 0; i < fieldPermutation.length; i++) {
                 fieldPermutation[i] = i;
             }
-            LSMIndexBulkLoadOperatorNodePushable op =
-                    new LSMIndexBulkLoadOperatorNodePushable(secondaryIndexHelperFactory, primaryIndexHelperFactory,
-                            ctx, 0, fieldPermutation, 1.0F, false, numElementsHint, true, secondaryIndexInfo.rDesc,
-                            BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null);
+            int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
+            int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+            IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
+            ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
+                    primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
+            LSMIndexBulkLoadOperatorNodePushable op = new LSMIndexBulkLoadOperatorNodePushable(
+                    secondaryIndexHelperFactory, primaryIndexHelperFactory, ctx, 0, fieldPermutation, 1.0F, false,
+                    numElementsHint, true, secondaryIndexInfo.rDesc, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(),
+                    null, tuplePartitionerFactory, partitionsMap);
             op.setOutputFrameWriter(0, new SinkRuntimeFactory().createPushRuntime(ctx)[0], null);
             return Pair.of(secondaryIndexInfo, op);
         } catch (Throwable th) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 3d3d1f271e..06380fe2b3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -233,12 +233,4 @@ public class TestDataUtil {
         Assert.assertTrue(nodeFileSplit.isPresent());
         return nodeFileSplit.get().getPath();
     }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index add708ed31..e1092ddde2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
-import org.apache.asterix.common.TestDataUtil;
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.DataverseName;
@@ -58,6 +57,7 @@ import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -176,7 +176,7 @@ public class IndexDropOperatorNodePushableTest {
         dataflowHelper.open();
         // try to drop in-use index (should fail)
         IndexDropOperatorNodePushable dropInUseOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
+                EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
         try {
             dropInUseOp.initialize();
         } catch (HyracksDataException e) {
@@ -192,7 +192,7 @@ public class IndexDropOperatorNodePushableTest {
         dropFailed.set(false);
         // drop with option wait for in-use should be successful once the index is closed
         final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestDataUtil.getPartitionsMap(1));
+                EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestUtils.getPartitionsMap(1));
         Thread dropThread = new Thread(() -> {
             try {
                 dropWithWaitOp.initialize();
@@ -216,7 +216,7 @@ public class IndexDropOperatorNodePushableTest {
         dropFailed.set(false);
         // Dropping non-existing index
         IndexDropOperatorNodePushable dropNonExistingOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.noneOf(DropOption.class), ctx, 0, TestDataUtil.getPartitionsMap(1));
+                EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
         try {
             dropNonExistingOp.initialize();
         } catch (HyracksDataException e) {
@@ -232,7 +232,7 @@ public class IndexDropOperatorNodePushableTest {
         // Dropping non-existing index with if exists option should be successful
         dropFailed.set(false);
         IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp = new IndexDropOperatorNodePushable(helperFactory,
-                EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestDataUtil.getPartitionsMap(1));
+                EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestUtils.getPartitionsMap(1));
         try {
             dropNonExistingWithIfExistsOp.initialize();
         } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7383ca3f2a..97d5c39dc9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -740,10 +741,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
         // move key fields to front
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
+        int[] pkFields = new int[numKeys];
         int i = 0;
         for (LogicalVariable varKey : keys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
+            pkFields[i] = idx;
             i++;
         }
         fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
@@ -760,11 +763,17 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         // right callback
         // (ex. what's the expected behavior when there is an error during
         // bulkload?)
+        int[][] partitionsMap = getPartitionsMap(dataset);
+        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+        IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+        ITuplePartitionerFactory partitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
         IIndexDataflowHelperFactory indexHelperFactory =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-        LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
-                fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
-                indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null);
+        LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+                new LSMIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation,
+                        StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory,
+                        null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
         return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
@@ -1086,7 +1095,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         int numPartitions = getNumPartitions(splitsAndConstraint.second);
         int[][] partitionsMap = getPartitionsMap(numPartitions);
         IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
-        ITuplePartitionerFactory tuplePartitionerFactory =
+        ITuplePartitionerFactory partitionerFactory =
                 new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
 
         IOperatorDescriptor op;
@@ -1094,7 +1103,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
             op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
                     StorageConstants.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
-                    BulkLoadUsage.LOAD, dataset.getDatasetId(), null);
+                    BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
         } else {
             if (indexOp == IndexOperation.INSERT) {
                 ISearchOperationCallbackFactory searchCallbackFactory = dataset
@@ -1102,7 +1111,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
                 Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE
                         .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).stream()
-                        .filter(index -> index.isPrimaryKeyIndex()).findFirst();
+                        .filter(Index::isPrimaryKeyIndex).findFirst();
                 IIndexDataflowHelperFactory pkidfh = null;
                 if (primaryKeyIndex.isPresent()) {
                     Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primaryKeySplitsAndConstraint =
@@ -1111,12 +1120,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                             primaryKeySplitsAndConstraint.first);
                 }
                 op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
-                        modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields,
-                        tuplePartitionerFactory, partitionsMap);
+                        modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
+                        partitionsMap);
 
             } else {
                 op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        null, true, modificationCallbackFactory, tuplePartitionerFactory, partitionsMap);
+                        null, true, modificationCallbackFactory, partitionerFactory, partitionsMap);
             }
         }
         return new Pair<>(op, splitsAndConstraint.second);
@@ -1288,7 +1297,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int numPartitions = getNumPartitions(splitsAndConstraint.second);
             int[][] partitionsMap = getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
-            ITuplePartitionerFactory tuplePartitionerFactory =
+            ITuplePartitionerFactory partitionerFactory =
                     new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
 
             IOperatorDescriptor op;
@@ -1296,15 +1305,15 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
                 op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
                         StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
-                        BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
+                        BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory, partitionsMap);
             } else if (indexOp == IndexOperation.UPSERT) {
                 int operationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
                         filterFactory, prevFilterFactory, modificationCallbackFactory, operationFieldIndex,
-                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory, partitionsMap);
+                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        filterFactory, false, modificationCallbackFactory, tuplePartitionerFactory, partitionsMap);
+                        filterFactory, false, modificationCallbackFactory, partitionerFactory, partitionsMap);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
@@ -1465,7 +1474,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         int numPartitions = getNumPartitions(splitsAndConstraint.second);
         int[][] partitionsMap = getPartitionsMap(numPartitions);
         IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
-        ITuplePartitionerFactory tuplePartitionerFactory =
+        ITuplePartitionerFactory partitionerFactory =
                 new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
 
         IOperatorDescriptor op;
@@ -1473,17 +1482,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
             op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
                     StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
-                    indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
+                    indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory,
+                    partitionerFactory, partitionsMap);
         } else if (indexOp == IndexOperation.UPSERT) {
             int operationFieldIndex = propagatedSchema.findVariable(operationVar);
             op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
                     indexDataflowHelperFactory, filterFactory, prevFilterFactory, modificationCallbackFactory,
-                    operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory,
+                    operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
                     partitionsMap);
         } else {
             op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
-                    indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory,
-                    tuplePartitionerFactory, partitionsMap);
+                    indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
+                    partitionsMap);
         }
         return new Pair<>(op, splitsAndConstraint.second);
     }
@@ -1585,7 +1595,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int numPartitions = getNumPartitions(splitsAndConstraint.second);
             int[][] partitionsMap = getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
-            ITuplePartitionerFactory tuplePartitionerFactory =
+            ITuplePartitionerFactory partitionerFactory =
                     new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
 
             IOperatorDescriptor op;
@@ -1593,16 +1603,17 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
                 op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
                         StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
-                        null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
+                        null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory,
+                        partitionsMap);
             } else if (indexOp == IndexOperation.UPSERT) {
                 int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
                         filterFactory, prevFilterFactory, modificationCallbackFactory, upsertOperationFieldIndex,
-                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory, partitionsMap);
+                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, false, modificationCallbackFactory,
-                        tuplePartitionerFactory, partitionsMap);
+                        indexDataFlowFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
+                        partitionsMap);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 6daceb7c59..89dea40264 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -412,10 +412,11 @@ public class DatasetUtil {
                 IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec,
                 dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true,
                 indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false, null, null, -1, false,
-                null, null, projectorFactory, null, null);
+                null, null, projectorFactory, null, partitionsMap);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
                 primaryPartitionConstraint);
         return primarySearchOp;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 161e86e552..43b1fb9986 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.metadata.utils;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,12 +66,15 @@ import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeF
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
@@ -313,10 +317,20 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper {
 
     protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
             int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor,
-            long numElementHint) {
+            long numElementHint) throws AlgebricksException {
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+        int[] pkFields = new int[dataset.getPrimaryKeys().size()];
+        for (int i = 0; i < pkFields.length; i++) {
+            pkFields[i] = fieldPermutation[i];
+        }
+        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+        IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
+        ITuplePartitionerFactory partitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
         LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
                 recordDesc, fieldPermutation, fillFactor, false, numElementHint, true, dataflowHelperFactory, null,
-                LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null);
+                LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
+                partitionsMap);
         treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
                 partitionConstraint);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java
index a6dc65d019..5b4fac84f6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryArrayIndexBTreeOperationsHelper.java
@@ -330,8 +330,10 @@ public class SecondaryArrayIndexBTreeOperationsHelper extends SecondaryTreeIndex
             // Apply the bulk loading operator.
             IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
                     metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
-            targetOp = createTreeIndexBulkLoadOp(spec, createFieldPermutationForBulkLoadOp(numTotalSecondaryKeys),
-                    dataflowHelperFactory, StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+            int[] fieldPermutations = createFieldPermutationForBulkLoadOp(numTotalSecondaryKeys);
+            int[] pkFields = createPkFieldPermutationForBulkLoadOp(fieldPermutations, numTotalSecondaryKeys);
+            targetOp = createTreeIndexBulkLoadOp(spec, fieldPermutations, dataflowHelperFactory,
+                    StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
             spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
 
             // Apply the sink.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 01d1c383ed..1889aa14e4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -62,14 +62,16 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails) index.getIndexDetails();
-        int[] fieldPermutation = createFieldPermutationForBulkLoadOp(indexDetails.getKeyFieldNames().size());
-        IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
-                metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
-        boolean excludeUnknown = excludeUnknownKeys(index, indexDetails, anySecondaryKeyIsNullable);
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return spec;
         } else {
+            Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails) index.getIndexDetails();
+            int numSecondaryKeys = getNumSecondaryKeys();
+            int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numSecondaryKeys);
+            int[] pkFields = createPkFieldPermutationForBulkLoadOp(fieldPermutation, numSecondaryKeys);
+            IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
+                    metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
+            boolean excludeUnknown = excludeUnknownKeys(index, indexDetails, anySecondaryKeyIsNullable);
             // job spec:
             // key provider -> primary idx scan -> cast assign -> (select)? -> (sort)? -> bulk load -> sink
             IndexUtil.bindJobEventListener(spec, metadataProvider);
@@ -86,15 +88,14 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations
 
             sourceOp = targetOp;
             // primary index ----> cast assign op (produces the secondary index entry)
-            targetOp = createAssignOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
+            targetOp = createAssignOp(spec, numSecondaryKeys, secondaryRecDesc);
             spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
 
             sourceOp = targetOp;
             if (excludeUnknown) {
                 // if any of the secondary fields are nullable, then add a select op that filters nulls.
                 // assign op ----> select op
-                targetOp =
-                        createFilterAllUnknownsSelectOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
+                targetOp = createFilterAllUnknownsSelectOp(spec, numSecondaryKeys, secondaryRecDesc);
                 spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
                 sourceOp = targetOp;
             }
@@ -108,7 +109,7 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations
 
             // cast assign op OR select op OR sort op ----> bulk load op
             targetOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory,
-                    StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+                    StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
             spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
 
             // bulk load op ----> sink op
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 76c2a73c93..02c05a912c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.metadata.utils;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -67,11 +68,14 @@ import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperat
 import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -439,13 +443,20 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO
     }
 
     protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
-            int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) {
+            int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor, int[] pkFields)
+            throws AlgebricksException {
         IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+        IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
+        ITuplePartitionerFactory partitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
         // when an index is being created (not loaded) the filtration is introduced in the pipeline -> no tuple filter
-        LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
-                secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
-                primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null);
+        LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp =
+                new LSMIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, fillFactor, false,
+                        numElementsHint, false, dataflowHelperFactory, primaryIndexDataflowHelperFactory,
+                        BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null, partitionerFactory, partitionsMap);
         treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
@@ -498,27 +509,6 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO
         return asterixSelectOp;
     }
 
-    protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys,
-            RecordDescriptor secondaryRecDesc) {
-        int[] outColumns = new int[numSecondaryKeys];
-        int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            outColumns[i] = i + numPrimaryKeys + 1;
-            projectionList[i] = i + numPrimaryKeys + 1;
-        }
-
-        IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
-        System.arraycopy(secondaryFieldAccessEvalFactories, 0, sefs, 0, secondaryFieldAccessEvalFactories.length);
-        //add External RIDs to the projection list
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            projectionList[numSecondaryKeys + i] = i + 1;
-        }
-
-        AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
-        return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new IPushRuntimeFactory[] { assign },
-                new RecordDescriptor[] { secondaryRecDesc });
-    }
-
     @Override
     public RecordDescriptor getSecondaryRecDesc() {
         return secondaryRecDesc;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index a84454f3ec..fa5510524d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -169,7 +169,8 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp
         enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits);
         // For tokenization, sorting and loading.
         // One token (+ optional partitioning field) + primary keys.
-        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
+        int pkOff = getNumTokens();
+        numTokenKeyPairFields = pkOff + numPrimaryKeys;
         ISerializerDeserializer[] tokenKeyPairFields =
                 new ISerializerDeserializer[numTokenKeyPairFields + numFilterFields];
         ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
@@ -177,12 +178,10 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp
         tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
         tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
         tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
-        int pkOff = 1;
         if (isPartitioned) {
             tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
             tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
             tokenKeyPairComparatorFactories[1] = ShortBinaryComparatorFactory.INSTANCE;
-            pkOff = 2;
         }
         if (numPrimaryKeys > 0) {
             tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0];
@@ -303,14 +302,25 @@ public class SecondaryInvertedIndexOperationsHelper extends SecondaryTreeIndexOp
         return sortOp;
     }
 
-    private AbstractSingleActivityOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) {
+    private AbstractSingleActivityOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec)
+            throws AlgebricksException {
         int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields];
         for (int i = 0; i < fieldPermutation.length; i++) {
             fieldPermutation[i] = i;
         }
+        // how can numPrimaryKeys be 0?
+        int[] pkFields = new int[numPrimaryKeys];
+        int pkOffset = getNumTokens();
+        for (int i = 0; i < pkFields.length; i++) {
+            pkFields[i] = fieldPermutation[pkOffset + i];
+        }
         IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
                 metadataProvider.getStorageComponentProvider().getStorageManager(), secondaryFileSplitProvider);
         return createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory,
-                StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+                StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
+    }
+
+    private int getNumTokens() {
+        return isPartitioned ? 2 : 1;
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 792d495924..1c20eff23c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -192,6 +192,7 @@ public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperations
          ***************************************************/
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numNestedSecondaryKeyFields);
+        int[] pkFields = createPkFieldPermutationForBulkLoadOp(fieldPermutation, numNestedSecondaryKeyFields);
         int numNestedSecondaryKeFieldsConsideringPointMBR =
                 isPointMBR ? numNestedSecondaryKeyFields / 2 : numNestedSecondaryKeyFields;
         RecordDescriptor secondaryRecDescConsideringPointMBR =
@@ -230,7 +231,7 @@ public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperations
                     isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc);
             // Create secondary RTree bulk load op.
             TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation,
-                    indexDataflowHelperFactory, StorageConstants.DEFAULT_TREE_FILL_FACTOR);
+                    indexDataflowHelperFactory, StorageConstants.DEFAULT_TREE_FILL_FACTOR, pkFields);
             SinkRuntimeFactory sinkRuntimeFactory = new SinkRuntimeFactory();
             sinkRuntimeFactory.setSourceLocation(sourceLoc);
             AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 14c9dda3d1..4fda6e449c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -110,4 +110,12 @@ public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexO
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;
     }
+
+    protected int[] createPkFieldPermutationForBulkLoadOp(int[] fieldsPermutation, int numSecondaryKeyFields) {
+        int[] pkFieldPermutation = new int[numPrimaryKeys];
+        for (int i = 0; i < pkFieldPermutation.length; i++) {
+            pkFieldPermutation[i] = fieldsPermutation[numSecondaryKeyFields + i];
+        }
+        return pkFieldPermutation;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
index ea84ca361e..6edb949122 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -22,6 +22,7 @@ package org.apache.asterix.runtime.operators;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -31,7 +32,7 @@ import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDe
 
 public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     public enum BulkLoadUsage {
         LOAD,
@@ -44,19 +45,17 @@ public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperato
 
     protected final int datasetId;
 
-    protected final ITupleFilterFactory tupleFilterFactory;
-
     public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
             IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId,
-            ITupleFilterFactory tupleFilterFactory) {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory partitionerFactory,
+            int[][] partitionsMap) {
         super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-                indexHelperFactory);
+                indexHelperFactory, tupleFilterFactory, partitionerFactory, partitionsMap);
         this.primaryIndexHelperFactory = primaryIndexHelperFactory;
         this.usage = usage;
         this.datasetId = datasetId;
-        this.tupleFilterFactory = tupleFilterFactory;
     }
 
     @Override
@@ -65,6 +64,6 @@ public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperato
         return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition,
                 fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
                 recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId,
-                tupleFilterFactory);
+                tupleFilterFactory, partitionerFactory, partitionsMap);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
index 52e3b2fc08..367f670e7a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -38,29 +39,35 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+import org.apache.hyracks.storage.common.IIndex;
 
 public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
-    protected final BulkLoadUsage usage;
 
-    protected final IIndexDataflowHelper primaryIndexHelper;
+    protected final BulkLoadUsage usage;
+    protected final IIndexDataflowHelper[] primaryIndexHelpers;
     protected final IDatasetLifecycleManager datasetManager;
     protected final int datasetId;
     protected final int partition;
-    protected ILSMIndex primaryIndex;
+    protected ILSMIndex[] primaryIndexes;
 
     public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
             IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition,
             int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId,
-            ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory partitionerFactory, int[][] partitionsMap)
+            throws HyracksDataException {
         super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint,
-                checkIfEmptyIndex, recDesc, tupleFilterFactory);
+                checkIfEmptyIndex, recDesc, tupleFilterFactory, partitionerFactory, partitionsMap);
 
         if (priamryIndexDataflowHelperFactory != null) {
-            this.primaryIndexHelper =
-                    priamryIndexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+            primaryIndexHelpers = new IIndexDataflowHelper[partitions.length];
+            primaryIndexes = new ILSMIndex[partitions.length];
+            for (int i = 0; i < partitions.length; i++) {
+                primaryIndexHelpers[i] = priamryIndexDataflowHelperFactory
+                        .create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+            }
         } else {
-            this.primaryIndexHelper = null;
+            primaryIndexHelpers = null;
         }
         this.usage = usage;
         this.datasetId = datasetId;
@@ -71,17 +78,17 @@ public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorN
     }
 
     @Override
-    protected void initializeBulkLoader() throws HyracksDataException {
+    protected void initializeBulkLoader(IIndex index, int indexId) throws HyracksDataException {
         ILSMIndex targetIndex = (ILSMIndex) index;
         Map<String, Object> parameters = new HashMap<>();
         parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
         if (usage.equals(BulkLoadUsage.LOAD)) {
-            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-                    parameters);
+            bulkLoaders[indexId] = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint,
+                    checkIfEmptyIndex, parameters);
         } else {
-            primaryIndexHelper.open();
-            primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
-            List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+            primaryIndexHelpers[indexId].open();
+            primaryIndexes[indexId] = (ILSMIndex) primaryIndexHelpers[indexId].getIndexInstance();
+            List<ILSMDiskComponent> primaryComponents = primaryIndexes[indexId].getDiskComponents();
             if (!primaryComponents.isEmpty()) {
                 ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
                         primaryComponents.get(primaryComponents.size() - 1).getId());
@@ -90,8 +97,8 @@ public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorN
                 parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
                         LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID);
             }
-            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-                    parameters);
+            bulkLoaders[indexId] = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint,
+                    checkIfEmptyIndex, parameters);
 
         }
     }
@@ -101,8 +108,8 @@ public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorN
         try {
             super.close();
         } finally {
-            if (primaryIndex != null) {
-                primaryIndexHelper.close();
+            if (primaryIndexHelpers != null) {
+                closeIndexes(primaryIndexes, primaryIndexHelpers);
             }
         }
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index 2b838d3142..f5ec1eb177 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -178,12 +178,4 @@ public class IntersectPOperator extends AbstractPhysicalOperator {
     public boolean expensiveThanMaterialization() {
         return false;
     }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
index cc922ffc30..ccd563a664 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -150,7 +150,7 @@ public class InsertPipelineExample {
         IIndexDataflowHelperFactory primaryHelperFactory =
                 new IndexDataflowHelperFactory(storageManager, primarySplitProvider);
 
-        int[][] partitionsMap = getPartitionsMap(splitNCs.length);
+        int[][] partitionsMap = JobHelper.getPartitionsMap(splitNCs.length);
         int[] pkFields = new int[] { primaryFieldPermutation[0] };
         IBinaryHashFunctionFactory[] pkHashFunFactories =
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
@@ -216,12 +216,4 @@ public class InsertPipelineExample {
 
         return spec;
     }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java
index 4b219e8f04..7b4a885cf0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/JobHelper.java
@@ -40,4 +40,12 @@ public class JobHelper {
     public static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, String[] splitNCs) {
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, splitNCs);
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 0eaa083eca..1495e602aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobId;
@@ -35,6 +36,7 @@ import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -147,8 +149,15 @@ public class PrimaryIndexBulkLoadExample {
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory =
                 new IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
-        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, recDesc,
-                fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory);
+        int[][] partitionsMap = JobHelper.getPartitionsMap(splitNCs.length);
+        int[] pkFields = new int[] { fieldPermutation[0] };
+        IBinaryHashFunctionFactory[] pkHashFunFactories =
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, splitNCs.length);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+                new TreeIndexBulkLoadOperatorDescriptor(spec, recDesc, fieldPermutation, 0.7f, false, 1000L, true,
+                        dataflowHelperFactory, null, tuplePartitionerFactory, partitionsMap);
 
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index cdb82305a1..94018b12c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -20,17 +20,21 @@ package org.apache.hyracks.examples.btree.client;
 
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.IntegerBinaryComparatorFactory;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
@@ -143,8 +147,15 @@ public class SecondaryIndexBulkLoadExample {
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
         IIndexDataflowHelperFactory secondaryHelperFactory =
                 new IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
-        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
-                fieldPermutation, 0.7f, false, 1000L, true, secondaryHelperFactory);
+        int[][] partitionsMap = JobHelper.getPartitionsMap(splitNCs.length);
+        int[] pkFields = new int[] { fieldPermutation[1] };
+        IBinaryHashFunctionFactory[] pkHashFunFactories =
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, splitNCs.length);
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+                new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, 0.7f, false, 1000L, true,
+                        secondaryHelperFactory, null, tuplePartitionerFactory, partitionsMap);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
         NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
         JobHelper.createPartitionConstraint(spec, nsOpDesc, splitNCs);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 86525f1dfa..1776b0970c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -33,6 +33,7 @@ import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryRecDes
 
 import java.io.DataOutput;
 import java.io.File;
+import java.util.Arrays;
 
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -71,10 +72,10 @@ import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.test.support.TestStorageManager;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.am.common.TreeOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
-import org.apache.hyracks.tests.integration.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -120,7 +121,7 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, primaryResourceFactory, false);
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -143,8 +144,12 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
-        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                primaryRecDesc, fieldPermutation, 0.7f, true, 1000L, true, primaryHelperFactory);
+        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+                primaryHashFunFactories, ordersSplits.length);
+        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad =
+                new TreeIndexBulkLoadOperatorDescriptor(spec, primaryRecDesc, fieldPermutation, 0.7f, true, 1000L, true,
+                        primaryHelperFactory, null, tuplePartitionerFactory, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -164,7 +169,7 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, secondaryResourceFactory, false);
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
@@ -205,8 +210,13 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest
 
         // load secondary index
         int[] fieldPermutation = { 3, 0 };
-        TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                secondaryRecDesc, fieldPermutation, 0.7f, true, 1000L, true, secondaryHelperFactory);
+        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+        ITuplePartitionerFactory tuplePartitionerFactory2 =
+                new FieldHashPartitionerFactory(secondaryPKFieldPermutationB, primaryHashFunFactories, numPartitions);
+        TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad =
+                new TreeIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, 0.7f, true, 1000L,
+                        true, secondaryHelperFactory, null, tuplePartitionerFactory2, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
         NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -234,7 +244,7 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest
                 new DelimitedDataTupleParserFactory(inputParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
-        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
+        int[][] partitionsMap = TestUtils.getPartitionsMap(ordersSplits.length);
         ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
                 primaryHashFunFactories, ordersSplits.length);
 
@@ -272,7 +282,7 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest
     protected void destroyPrimaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor primaryDropOp =
-                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
         spec.addRoot(primaryDropOp);
         runTest(spec);
@@ -281,7 +291,7 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest
     protected void destroySecondaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor secondaryDropOp =
-                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
         spec.addRoot(secondaryDropOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 3b16bb8fe5..d5ad7da59c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -24,9 +24,11 @@ import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryKeyField
 
 import java.io.DataOutput;
 import java.io.File;
+import java.util.Arrays;
 
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
@@ -77,9 +79,9 @@ import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.test.support.TestStorageManager;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
-import org.apache.hyracks.tests.integration.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -107,6 +109,8 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
     protected final int primaryKeyFieldCount = 1;
     protected final IBinaryComparatorFactory[] primaryComparatorFactories =
             new IBinaryComparatorFactory[primaryKeyFieldCount];
+    protected final IBinaryHashFunctionFactory[] primaryHashFactories =
+            new IBinaryHashFunctionFactory[primaryKeyFieldCount];
 
     protected final RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
             new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
@@ -178,6 +182,7 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
         primaryTypeTraits[8] = UTF8StringPointable.TYPE_TRAITS;
         primaryTypeTraits[9] = UTF8StringPointable.TYPE_TRAITS;
         primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryHashFactories[0] = primaryHashFunFactories[0];
 
         // field, type and key declarations for secondary indexes
         secondaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
@@ -238,7 +243,7 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
         IIndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, primarySplitProvider, btreeFactory, false);
         IndexCreateOperatorDescriptor primaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
         spec.addRoot(primaryCreateOp);
         runTest(spec);
@@ -275,8 +280,12 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
-        TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                primaryRecDesc, fieldPermutation, 0.7f, false, 1000L, true, primaryHelperFactory);
+        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+                primaryHashFunFactories, ordersSplits.length);
+        TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad =
+                new TreeIndexBulkLoadOperatorDescriptor(spec, primaryRecDesc, fieldPermutation, 0.7f, false, 1000L,
+                        true, primaryHelperFactory, null, tuplePartitionerFactory, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
 
         NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -297,7 +306,7 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
         IndexBuilderFactory indexBuilderFactory =
                 new IndexBuilderFactory(storageManager, secondarySplitProvider, rtreeFactory, false);
         IndexCreateOperatorDescriptor secondaryCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, getPartitionsMap(1));
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
         spec.addRoot(secondaryCreateOp);
         runTest(spec);
@@ -333,8 +342,14 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
 
         // load secondary index
         int[] fieldPermutation = { 6, 7, 8, 9, 0 };
-        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                secondaryRecDesc, fieldPermutation, 0.7f, false, 1000L, true, secondaryHelperFactory);
+        int[] pkFields = { 4 };
+        int[][] partitionsMap = TestUtils.getPartitionsMap(1);
+        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count();
+        ITuplePartitionerFactory partitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, primaryHashFactories, numPartitions);
+        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad =
+                new TreeIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, 0.7f, false, 1000L,
+                        true, secondaryHelperFactory, null, partitionerFactory, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
 
         NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
@@ -375,7 +390,7 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
                 ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
-        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
+        int[][] partitionsMap = TestUtils.getPartitionsMap(ordersSplits.length);
         // insert into primary index
         int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
         ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
@@ -412,7 +427,7 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
     protected void destroyPrimaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor primaryDropOp =
-                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, primaryHelperFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryDropOp, NC1_ID);
         spec.addRoot(primaryDropOp);
         runTest(spec);
@@ -421,7 +436,7 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest
     protected void destroySecondaryIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IndexDropOperatorDescriptor secondaryDropOp =
-                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, getPartitionsMap(1));
+                new IndexDropOperatorDescriptor(spec, secondaryHelperFactory, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryDropOp, NC1_ID);
         spec.addRoot(secondaryDropOp);
         runTest(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index ee56375c4d..6f59ed8c17 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -236,12 +236,4 @@ public abstract class AbstractIntegrationTest {
         outputFiles.add(fileRef.getFile());
         return new ManagedFileSplit(ncs.getId(), fileName);
     }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
index 8f3181d3b9..73d985d867 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
@@ -68,12 +68,4 @@ public class TestUtil {
     static ObjectNode httpGetAsObject(URI uri) throws URISyntaxException, IOException {
         return getResultAsJson(httpGetAsString(uri));
     }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 12b6ae699b..f401e45914 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -21,6 +21,8 @@ package org.apache.hyracks.storage.am.common.dataflow;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -31,32 +33,49 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperato
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.buffercache.NoOpPageWriteCallback;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
     protected final IHyracksTaskContext ctx;
     protected final float fillFactor;
     protected final boolean verifyInput;
     protected final long numElementsHint;
     protected final boolean checkIfEmptyIndex;
-    protected final IIndexDataflowHelper indexHelper;
+    protected final IIndexDataflowHelper[] indexHelpers;
     protected final RecordDescriptor recDesc;
     protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
     protected final ITupleFilterFactory tupleFilterFactory;
+    protected final ITuplePartitioner tuplePartitioner;
+    protected final int[] partitions;
+    protected final Int2IntMap storagePartitionId2Index;
     protected FrameTupleAccessor accessor;
-    protected IIndex index;
-    protected IIndexBulkLoader bulkLoader;
+    protected final IIndex[] indexes;
+    protected final IIndexBulkLoader[] bulkLoaders;
     protected ITupleFilter tupleFilter;
     protected FrameTupleReference frameTuple;
 
-    public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
-            IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput,
-            long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc,
-            ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+    public IndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx,
+            int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, RecordDescriptor recDesc, ITupleFilterFactory tupleFilterFactory,
+            ITuplePartitionerFactory partitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         this.ctx = ctx;
-        this.indexHelper = indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        this.partitions = partitionsMap[partition];
+        this.tuplePartitioner = partitionerFactory.createPartitioner(ctx);
+        this.storagePartitionId2Index = new Int2IntOpenHashMap();
+        this.indexes = new IIndex[partitions.length];
+        this.indexHelpers = new IIndexDataflowHelper[partitions.length];
+        this.bulkLoaders = new IIndexBulkLoader[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            storagePartitionId2Index.put(partitions[i], i);
+            indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+        }
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
         this.numElementsHint = numElementsHint;
@@ -69,15 +88,18 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
     @Override
     public void open() throws HyracksDataException {
         accessor = new FrameTupleAccessor(recDesc);
-        indexHelper.open();
-        index = indexHelper.getIndexInstance();
+        for (int i = 0; i < indexHelpers.length; i++) {
+            indexHelpers[i].open();
+            indexes[i] = indexHelpers[i].getIndexInstance();
+            initializeBulkLoader(indexes[i], i);
+        }
+
         try {
             writer.open();
             if (tupleFilterFactory != null) {
                 tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
                 frameTuple = new FrameTupleReference();
             }
-            initializeBulkLoader();
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -94,8 +116,10 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
                     continue;
                 }
             }
+            int storagePartition = tuplePartitioner.partition(accessor, i);
+            int storageIdx = storagePartitionId2Index.get(storagePartition);
             tuple.reset(accessor, i);
-            bulkLoader.add(tuple);
+            bulkLoaders[storageIdx].add(tuple);
         }
 
         FrameUtils.flushFrame(buffer, writer);
@@ -104,20 +128,14 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
     @Override
     public void close() throws HyracksDataException {
         try {
-            // bulkloader can be null if an exception is thrown before it is initialized.
-            if (bulkLoader != null) {
-                bulkLoader.end();
-            }
+            closeBulkLoaders();
         } catch (Throwable th) {
             throw HyracksDataException.create(th);
         } finally {
-            if (index != null) {
-                // If index was opened!
-                try {
-                    indexHelper.close();
-                } finally {
-                    writer.close();
-                }
+            try {
+                closeIndexes(indexes, indexHelpers);
+            } finally {
+                writer.close();
             }
         }
     }
@@ -129,13 +147,33 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
 
     @Override
     public void fail() throws HyracksDataException {
-        if (index != null) {
-            writer.fail();
-        }
+        writer.fail();
     }
 
-    protected void initializeBulkLoader() throws HyracksDataException {
-        bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+    protected void initializeBulkLoader(IIndex index, int indexId) throws HyracksDataException {
+        bulkLoaders[indexId] = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
                 NoOpPageWriteCallback.INSTANCE);
     }
+
+    private void closeBulkLoaders() throws HyracksDataException {
+        for (IIndexBulkLoader bulkLoader : bulkLoaders) {
+            // bulkloader can be null if an exception is thrown before it is initialized.
+            if (bulkLoader != null) {
+                bulkLoader.end();
+            }
+        }
+    }
+
+    protected static void closeIndexes(IIndex[] indexes, IIndexDataflowHelper[] indexHelpers)
+            throws HyracksDataException {
+        Throwable failure = null;
+        for (int i = 0; i < indexes.length; i++) {
+            if (indexes[i] != null) {
+                failure = ResourceReleaseUtils.close(indexHelpers[i], failure);
+            }
+        }
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index 8346f62ef4..351fbf6b06 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -22,6 +22,7 @@ package org.apache.hyracks.storage.am.common.dataflow;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -30,27 +31,23 @@ import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 
 public class TreeIndexBulkLoadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     protected final int[] fieldPermutation;
     protected final float fillFactor;
     protected final boolean verifyInput;
     protected final long numElementsHint;
     protected final boolean checkIfEmptyIndex;
+    protected final int[][] partitionsMap;
+    protected final ITuplePartitionerFactory partitionerFactory;
     protected final IIndexDataflowHelperFactory indexHelperFactory;
-    private final ITupleFilterFactory tupleFilterFactory;
-
-    public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
-            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory) {
-        this(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
-                indexHelperFactory, null);
-    }
+    protected final ITupleFilterFactory tupleFilterFactory;
 
     public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
             boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
-            ITupleFilterFactory tupleFilterFactory) {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory partitionerFactory,
+            int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.fieldPermutation = fieldPermutation;
@@ -60,6 +57,8 @@ public class TreeIndexBulkLoadOperatorDescriptor extends AbstractSingleActivityO
         this.checkIfEmptyIndex = checkIfEmptyIndex;
         this.outRecDescs[0] = outRecDesc;
         this.tupleFilterFactory = tupleFilterFactory;
+        this.partitionerFactory = partitionerFactory;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -67,6 +66,7 @@ public class TreeIndexBulkLoadOperatorDescriptor extends AbstractSingleActivityO
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new IndexBulkLoadOperatorNodePushable(indexHelperFactory, ctx, partition, fieldPermutation, fillFactor,
                 verifyInput, numElementsHint, checkIfEmptyIndex,
-                recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), tupleFilterFactory);
+                recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), tupleFilterFactory,
+                partitionerFactory, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 2348a1aca3..8ac5dfc5b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -190,4 +190,12 @@ public class TestUtils {
             context.updateLoggers();
         }
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }