You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2023/04/15 12:43:54 UTC

[asterixdb] branch master updated: [ASTERIXDB-3144][RT] Make hash exchanges consider partitions map

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bbe412b61 [ASTERIXDB-3144][RT] Make hash exchanges consider partitions map
5bbe412b61 is described below

commit 5bbe412b6173bce97e12b177e4cbcd839c619d4c
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Fri Apr 14 01:16:09 2023 -0700

    [ASTERIXDB-3144][RT] Make hash exchanges consider partitions map
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Some operators like INSERT,UPSERT, etc. will require their input
    to hash partition the data based on a partitions map. This patch
    is to make the hash exchanges satisfy this requirement.
    Hash exchanges will take an optional partitions map to use when
    hash partitioning.
    
    - Make sure the partitions map is considered when comparing
      partitioning properties.
    
    Change-Id: I71457603048e9be9467943918e21ce5ede658c19
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17489
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../operators/physical/BTreeSearchPOperator.java   |  8 ++-
 .../physical/ExternalDataLookupPOperator.java      |  4 +-
 .../operators/physical/IndexSearchPOperator.java   |  8 +--
 .../operators/physical/SpatialJoinPOperator.java   |  4 +-
 .../rules/am/IntroduceLSMComponentFilterRule.java  |  2 +-
 .../rules/am/IntroduceSelectAccessMethodRule.java  |  8 +--
 .../asterix/app/function/QueryIndexDatasource.java |  7 ++-
 .../app/resource/PlanStagesGeneratorTest.java      |  4 +-
 .../app/resource/RequiredCapacityVisitorTest.java  |  6 +--
 .../declared/DataSourcePartitioningProvider.java   | 62 +++++++++++++++++-----
 .../metadata/declared/FunctionDataSource.java      |  7 ++-
 .../metadata/declared/MetadataProvider.java        | 12 +++++
 .../metadata/declared/SampleDataSource.java        |  4 ++
 .../metadata/IDataSourcePropertiesProvider.java    | 12 +++--
 .../operators/logical/IntersectOperator.java       | 16 ++++--
 .../visitors/IsomorphismOperatorVisitor.java       | 27 +++++++---
 ...calOperatorDeepCopyWithNewVariablesVisitor.java |  7 ++-
 .../logical/visitors/OperatorDeepCopyVisitor.java  |  7 ++-
 .../physical/AbstractHashJoinPOperator.java        | 26 +++++----
 .../AbstractPreSortedDistinctByPOperator.java      |  2 +-
 .../AbstractPreclusteredGroupByPOperator.java      |  2 +-
 .../operators/physical/AbstractScanPOperator.java  |  3 +-
 .../physical/AbstractWindowPOperator.java          |  2 +-
 .../operators/physical/BulkloadPOperator.java      |  4 +-
 .../physical/DataSourceScanPOperator.java          |  5 +-
 .../physical/ExternalGroupByPOperator.java         |  5 +-
 .../physical/HashPartitionExchangePOperator.java   | 21 ++++++--
 .../HashPartitionMergeExchangePOperator.java       | 17 ++++--
 .../operators/physical/IndexBulkloadPOperator.java |  6 +--
 .../physical/IndexInsertDeleteUpsertPOperator.java |  6 +--
 .../physical/InsertDeleteUpsertPOperator.java      |  5 +-
 .../operators/physical/IntersectPOperator.java     | 15 +++++-
 .../operators/physical/WriteResultPOperator.java   |  5 +-
 .../IPartitioningRequirementsCoordinator.java      | 20 ++++---
 .../core/algebra/properties/PropertiesUtil.java    |  3 ++
 .../properties/UnorderedPartitionedProperty.java   | 32 +++++++++--
 .../rules/EnforceStructuralPropertiesRule.java     | 17 ++----
 .../rules/RemoveUnnecessarySortMergeExchange.java  |  9 ++--
 .../hyracks/hyracks-dataflow-common/pom.xml        |  4 ++
 .../data/partition/FieldHashPartitionComputer.java |  7 ++-
 .../FieldHashPartitionComputerFactory.java         | 27 +++++++++-
 .../data/partition/FieldHashPartitioner.java       |  2 +-
 .../common/data/partition/HashPartitioner.java     | 17 +++++-
 43 files changed, 337 insertions(+), 130 deletions(-)

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index d56963e596..8865bb2241 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -216,7 +216,7 @@ public class BTreeSearchPOperator extends IndexSearchPOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         if (requiresBroadcast) {
             // For primary indexes optimizing an equality condition we can reduce the broadcast requirement to hash partitioning.
             if (isPrimaryIndex && isEqCondition) {
@@ -239,7 +239,11 @@ public class BTreeSearchPOperator extends IndexSearchPOperator {
                         orderColumns.add(new OrderColumn(orderVar, OrderKind.ASC));
                     }
                     propsLocal.add(new LocalOrderProperty(orderColumns));
-                    pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, domain),
+                    MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+                    Dataset dataset = mp.findDataset(searchIndex.getDataverseName(), searchIndex.getDatasetName());
+                    int[][] partitionsMap = mp.getPartitionsMap(dataset);
+                    pv[0] = new StructuralPropertiesVector(
+                            UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, domain, partitionsMap),
                             propsLocal);
                     return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
                 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
index 9917589a4d..90be495168 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
@@ -112,7 +112,7 @@ public class ExternalDataLookupPOperator extends AbstractScanPOperator {
                 dataset.getDatasetDetails(), context.getComputationNodeDomain());
         IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
         AbstractScanOperator as = (AbstractScanOperator) op;
-        deliveredProperties = dspp.computeDeliveredProperties(as.getVariables());
+        deliveredProperties = dspp.computeDeliveredProperties(as.getVariables(), context);
     }
 
     @Override
@@ -156,7 +156,7 @@ public class ExternalDataLookupPOperator extends AbstractScanPOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         if (requiresBroadcast) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
             pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
index d688ab8d5b..f522d935b0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -65,11 +66,12 @@ public abstract class IndexSearchPOperator extends AbstractScanPOperator {
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
         IDataSource<?> ds = idx.getDataSource();
         IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
         AbstractScanOperator as = (AbstractScanOperator) op;
-        deliveredProperties = dspp.computeDeliveredProperties(as.getScanVariables());
+        deliveredProperties = dspp.computeDeliveredProperties(as.getScanVariables(), context);
     }
 
     protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) {
@@ -85,7 +87,7 @@ public abstract class IndexSearchPOperator extends AbstractScanPOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         if (requiresBroadcast) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
             pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(domain), null);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
index c0d9f4b694..513b5aa80c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
@@ -121,9 +121,9 @@ public class SpatialJoinPOperator extends AbstractJoinPOperator {
         keysLeftBranchTileId.add(keysLeftBranch.get(0));
         List<LogicalVariable> keysRightBranchTileId = new ArrayList<>();
         keysRightBranchTileId.add(keysRightBranch.get(0));
-        IPartitioningProperty pp1 = new UnorderedPartitionedProperty(new ListSet<>(keysLeftBranchTileId),
+        IPartitioningProperty pp1 = UnorderedPartitionedProperty.of(new ListSet<>(keysLeftBranchTileId),
                 context.getComputationNodeDomain());
-        IPartitioningProperty pp2 = new UnorderedPartitionedProperty(new ListSet<>(keysRightBranchTileId),
+        IPartitioningProperty pp2 = UnorderedPartitionedProperty.of(new ListSet<>(keysRightBranchTileId),
                 context.getComputationNodeDomain());
 
         List<ILocalStructuralProperty> localProperties1 = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
index fc8c3e929e..bb0bddbb1d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
@@ -340,7 +340,7 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
             inputCompareVars.add(new ArrayList<>(intersect.getInputCompareVariables(i)));
         }
         IntersectOperator intersectWithFilter = new IntersectOperator(intersect.getOutputCompareVariables(),
-                outputFilterVars, inputCompareVars, filterVars);
+                outputFilterVars, inputCompareVars, filterVars, intersect.getPartitionsMap());
         intersectWithFilter.setSourceLocation(intersect.getSourceLocation());
         intersectWithFilter.getInputs().addAll(intersect.getInputs());
         return intersectWithFilter;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index f0218452fc..43e482b7c2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -284,7 +284,8 @@ public class IntroduceSelectAccessMethodRule extends AbstractIntroduceAccessMeth
             subRoots.add(subRoot);
         }
         // Connect each secondary index utilization plan to a common intersect operator.
-        ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context);
+        Index idx = chosenIndexes.get(0).getSecond();
+        ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context, idx);
 
         subTree.getDataSourceRef().setValue(primaryUnnestOp);
         return primaryUnnestOp != null;
@@ -312,7 +313,7 @@ public class IntroduceSelectAccessMethodRule extends AbstractIntroduceAccessMeth
      * Connect each secondary index utilization plan to a common INTERSECT operator.
      */
     private ILogicalOperator connectAll2ndarySearchPlanWithIntersect(List<ILogicalOperator> subRoots,
-            IOptimizationContext context) throws AlgebricksException {
+            IOptimizationContext context, Index idx) throws AlgebricksException {
         ILogicalOperator lop = subRoots.get(0);
         List<List<LogicalVariable>> inputVars = new ArrayList<>(subRoots.size());
         for (int i = 0; i < subRoots.size(); i++) {
@@ -360,7 +361,8 @@ public class IntroduceSelectAccessMethodRule extends AbstractIntroduceAccessMeth
             VariableUtilities.substituteVariables(lop, inputVar, outputVar, context);
         }
 
-        IntersectOperator intersect = new IntersectOperator(outputVars, inputVars);
+        int[][] partitionsMap = metadataProvider.getPartitionsMap(idx);
+        IntersectOperator intersect = new IntersectOperator(outputVars, inputVars, partitionsMap);
         intersect.setSourceLocation(lop.getSourceLocation());
         for (ILogicalOperator secondarySearch : subRoots) {
             intersect.getInputs().add(secondarySearch.getInputs().get(0));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index fda3845b7b..de493acf1e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -33,6 +33,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
@@ -114,12 +115,14 @@ public class QueryIndexDatasource extends FunctionDataSource {
     public IDataSourcePropertiesProvider getPropertiesProvider() {
         return new IDataSourcePropertiesProvider() {
             @Override
-            public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
+            public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
+                    IOptimizationContext ctx) {
                 return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
             }
 
             @Override
-            public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
+            public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
+                    IOptimizationContext ctx) {
                 List<ILocalStructuralProperty> propsLocal = new ArrayList<>(1);
                 //TODO(ali): consider primary keys?
                 List<OrderColumn> secKeys = new ArrayList<>(numSecKeys);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index b0de85ed5f..bc9e6d313e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -193,7 +193,7 @@ public class PlanStagesGeneratorTest {
 
         ExchangeOperator exchangeOperator1 = new ExchangeOperator();
         exchangeOperator1.setExecutionMode(PARTITIONED);
-        exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
         exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));
 
         EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
@@ -207,7 +207,7 @@ public class PlanStagesGeneratorTest {
 
         ExchangeOperator exchangeOperator2 = new ExchangeOperator();
         exchangeOperator2.setExecutionMode(PARTITIONED);
-        exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
         exchangeOperator2.getInputs().add(new MutableObject<>(groupByOperator));
 
         LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
index cc18c31956..729a5608ad 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/RequiredCapacityVisitorTest.java
@@ -51,7 +51,7 @@ public class RequiredCapacityVisitorTest {
         // Constructs a parallel group-by query plan.
         GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
         ExchangeOperator exchange = new ExchangeOperator();
-        exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
         GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
         globalGby.getInputs().add(new MutableObject<>(exchange));
         exchange.getInputs().add(new MutableObject<>(localGby));
@@ -94,7 +94,7 @@ public class RequiredCapacityVisitorTest {
         // Left child plan of the join.
         ExchangeOperator leftChildExchange = new ExchangeOperator();
         leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
-        leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
         InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
         join.getInputs().add(new MutableObject<>(leftChildExchange));
         leftChildExchange.getInputs().add(new MutableObject<>(leftChild));
@@ -106,7 +106,7 @@ public class RequiredCapacityVisitorTest {
         // Right child plan of the join.
         ExchangeOperator rightChildExchange = new ExchangeOperator();
         rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
-        rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
+        rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
         GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
         join.getInputs().add(new MutableObject<>(rightChildExchange));
         rightChildExchange.getInputs().add(new MutableObject<>(rightChild));
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
index 81873af95c..f51e474466 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
@@ -22,7 +22,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
@@ -47,7 +50,8 @@ public class DataSourcePartitioningProvider implements IDataSourcePropertiesProv
     }
 
     @Override
-    public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
+    public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
+            IOptimizationContext ctx) throws AlgebricksException {
         IPhysicalPropertiesVector propsVector;
         IPartitioningProperty pp;
         List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
@@ -58,12 +62,23 @@ public class DataSourcePartitioningProvider implements IDataSourcePropertiesProv
                 ds.computeLocalStructuralProperties(propsLocal, scanVariables);
                 break;
             case DataSource.Type.FEED:
-                pp = getFeedPartitioningProperty(ds, domain, scanVariables);
+                String dsName = ((FeedDataSource) ds).getTargetDataset();
+                Dataset feedDs = ((MetadataProvider) ctx.getMetadataProvider())
+                        .findDataset(ds.getId().getDataverseName(), dsName);
+                int[][] partitionsMap1 = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(feedDs);
+                pp = getFeedDatasetPartitioningProperty(ds, domain, scanVariables, partitionsMap1);
                 break;
             case DataSource.Type.INTERNAL_DATASET:
             case DataSource.Type.SAMPLE:
                 Set<LogicalVariable> pvars = new ListSet<>();
-                pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars);
+                Dataset dataset;
+                if (ds.getDatasourceType() == DataSource.Type.INTERNAL_DATASET) {
+                    dataset = ((DatasetDataSource) ds).getDataset();
+                } else {
+                    dataset = ((SampleDataSource) ds).getDataset();
+                }
+                int[][] partitionsMap = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(dataset);
+                pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars, partitionsMap);
                 propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars)));
                 break;
             default:
@@ -74,14 +89,22 @@ public class DataSourcePartitioningProvider implements IDataSourcePropertiesProv
     }
 
     @Override
-    public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
-        if (ds.getDatasourceType() == DataSource.Type.INTERNAL_DATASET) {
-            IPartitioningProperty pp = new RandomPartitioningProperty(domain);
-            List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
-            ds.computeLocalStructuralProperties(propsLocal, scanVariables);
-            return new StructuralPropertiesVector(pp, propsLocal);
+    public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
+            IOptimizationContext ctx) throws AlgebricksException {
+        switch (ds.getDatasourceType()) {
+            case DataSource.Type.INTERNAL_DATASET: {
+                IPartitioningProperty pp = new RandomPartitioningProperty(domain);
+                List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
+                ds.computeLocalStructuralProperties(propsLocal, scanVariables);
+                return new StructuralPropertiesVector(pp, propsLocal);
+            }
+            case DataSource.Type.FEED: {
+                IPartitioningProperty pp = getFeedPartitioningProperty(ds, domain, scanVariables);
+                return new StructuralPropertiesVector(pp, new ArrayList<>());
+            }
+            default:
+                return computeRequiredProperties(scanVariables, ctx);
         }
-        return computeRequiredProperties(scanVariables);
     }
 
     private static List<OrderColumn> getOrderColumns(Set<LogicalVariable> pvars) {
@@ -93,13 +116,26 @@ public class DataSourcePartitioningProvider implements IDataSourcePropertiesProv
     }
 
     private static IPartitioningProperty getInternalDatasetPartitioningProperty(DataSource ds, INodeDomain domain,
-            List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars) {
+            List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars, int[][] partitionsMap) {
         IPartitioningProperty pp;
         if (scanVariables.size() < 2) {
             pp = new RandomPartitioningProperty(domain);
         } else {
             pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
-            pp = new UnorderedPartitionedProperty(pvars, domain);
+            pp = UnorderedPartitionedProperty.ofPartitionsMap(pvars, domain, partitionsMap);
+        }
+        return pp;
+    }
+
+    public static IPartitioningProperty getFeedDatasetPartitioningProperty(DataSource ds, INodeDomain domain,
+            List<LogicalVariable> scanVariables, int[][] partitionsMap) {
+        IPartitioningProperty pp;
+        if (scanVariables.size() < 2) {
+            pp = new RandomPartitioningProperty(domain);
+        } else {
+            Set<LogicalVariable> pvars = new ListSet<>();
+            pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
+            pp = UnorderedPartitionedProperty.ofPartitionsMap(pvars, domain, partitionsMap);
         }
         return pp;
     }
@@ -112,7 +148,7 @@ public class DataSourcePartitioningProvider implements IDataSourcePropertiesProv
         } else {
             Set<LogicalVariable> pvars = new ListSet<>();
             pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
-            pp = new UnorderedPartitionedProperty(pvars, domain);
+            pp = UnorderedPartitionedProperty.of(pvars, domain);
         }
         return pp;
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 9f7d567410..2c571629a4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -86,12 +87,14 @@ public abstract class FunctionDataSource extends DataSource {
         // Unordered Random partitioning on all nodes
         return new IDataSourcePropertiesProvider() {
             @Override
-            public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
+            public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
+                    IOptimizationContext ctx) {
                 return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
             }
 
             @Override
-            public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
+            public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
+                    IOptimizationContext ctx) {
                 return new StructuralPropertiesVector(new RandomPartitioningProperty(domain), Collections.emptyList());
             }
         };
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 00fe7ae873..7383ca3f2a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1887,6 +1887,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         validateDatabaseObjectNameImpl(objectName, sourceLoc);
     }
 
+    public int[][] getPartitionsMap(Dataset dataset) throws AlgebricksException {
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(dataset);
+        return getPartitionsMap(getNumPartitions(spPc.second));
+    }
+
+    public int[][] getPartitionsMap(Index idx) throws AlgebricksException {
+        Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+                getSplitProviderAndConstraints(ds, idx.getIndexName());
+        return getPartitionsMap(getNumPartitions(spPc.second));
+    }
+
     public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
         if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
             return ((AlgebricksCountPartitionConstraint) constraint).getCount();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 448f5ce522..c1858af8c0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -72,6 +72,10 @@ public class SampleDataSource extends DataSource {
         return false;
     }
 
+    public Dataset getDataset() {
+        return dataset;
+    }
+
     private static DataSourceId createSampleDataSourceId(Dataset dataset, String sampleIndexName) {
         return new DataSourceId(dataset.getDataverseName(), dataset.getDatasetName(), new String[] { sampleIndexName });
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
index 5086e32af5..3b3bc5db16 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSourcePropertiesProvider.java
@@ -20,21 +20,25 @@ package org.apache.hyracks.algebricks.core.algebra.metadata;
 
 import java.util.List;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 
 public interface IDataSourcePropertiesProvider {
     /**
-     *
      * @param scanVariables
+     * @param ctx
      * @return
      */
-    IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables);
+    IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables, IOptimizationContext ctx)
+            throws AlgebricksException;
 
     /**
-     *
      * @param scanVariables
+     * @param ctx
      * @return
      */
-    IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables);
+    IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables, IOptimizationContext ctx)
+            throws AlgebricksException;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
index f77d9db6e4..82bf431f2a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
@@ -44,15 +44,16 @@ public class IntersectOperator extends AbstractLogicalOperator {
 
     private final List<LogicalVariable> outputExtraVars;
     private final List<List<LogicalVariable>> inputExtraVars;
+    private final int[][] partitionsMap;
 
-    public IntersectOperator(List<LogicalVariable> outputCompareVars, List<List<LogicalVariable>> inputCompareVars)
-            throws AlgebricksException {
-        this(outputCompareVars, Collections.emptyList(), inputCompareVars, Collections.emptyList());
+    public IntersectOperator(List<LogicalVariable> outputCompareVars, List<List<LogicalVariable>> inputCompareVars,
+            int[][] partitionsMap) throws AlgebricksException {
+        this(outputCompareVars, Collections.emptyList(), inputCompareVars, Collections.emptyList(), partitionsMap);
     }
 
     public IntersectOperator(List<LogicalVariable> outputCompareVars, List<LogicalVariable> outputExtraVars,
-            List<List<LogicalVariable>> inputCompareVars, List<List<LogicalVariable>> inputExtraVars)
-            throws AlgebricksException {
+            List<List<LogicalVariable>> inputCompareVars, List<List<LogicalVariable>> inputExtraVars,
+            int[][] partitionsMap) throws AlgebricksException {
         int numCompareVars = outputCompareVars.size();
         for (List<LogicalVariable> vars : inputCompareVars) {
             if (vars.size() != numCompareVars) {
@@ -75,6 +76,7 @@ public class IntersectOperator extends AbstractLogicalOperator {
             }
         }
 
+        this.partitionsMap = partitionsMap;
         this.outputCompareVars = new ArrayList<>(outputCompareVars);
         this.inputCompareVars = new ArrayList<>(inputCompareVars);
         this.outputExtraVars = new ArrayList<>();
@@ -174,6 +176,10 @@ public class IntersectOperator extends AbstractLogicalOperator {
         return outputExtraVars;
     }
 
+    public int[][] getPartitionsMap() {
+        return partitionsMap;
+    }
+
     private List<LogicalVariable> concatOutputVariables() {
         return ListUtils.union(outputCompareVars, outputExtraVars);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 5a6574acb3..dfd6398cbe 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,6 +78,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -384,9 +386,9 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
         if (op.getOperatorTag() != LogicalOperatorTag.INTERSECT) {
             return Boolean.FALSE;
         }
-        IntersectOperator intersetOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
+        IntersectOperator intersectOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
         List<LogicalVariable> outputCompareVars = op.getOutputCompareVariables();
-        List<LogicalVariable> outputCompareVarsArg = intersetOpArg.getOutputCompareVariables();
+        List<LogicalVariable> outputCompareVarsArg = intersectOpArg.getOutputCompareVariables();
         if (outputCompareVars.size() != outputCompareVarsArg.size()) {
             return Boolean.FALSE;
         }
@@ -395,7 +397,7 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
         }
         boolean hasExtraVars = op.hasExtraVariables();
         List<LogicalVariable> outputExtraVars = op.getOutputExtraVariables();
-        List<LogicalVariable> outputExtraVarsArg = intersetOpArg.getOutputExtraVariables();
+        List<LogicalVariable> outputExtraVarsArg = intersectOpArg.getOutputExtraVariables();
         if (outputExtraVars.size() != outputExtraVarsArg.size()) {
             return Boolean.FALSE;
         }
@@ -404,19 +406,22 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
         }
 
         int nInput = op.getNumInput();
-        if (nInput != intersetOpArg.getNumInput()) {
+        if (nInput != intersectOpArg.getNumInput()) {
             return Boolean.FALSE;
         }
         for (int i = 0; i < nInput; i++) {
             if (!VariableUtilities.varListEqualUnordered(op.getInputCompareVariables(i),
-                    intersetOpArg.getInputCompareVariables(i))) {
+                    intersectOpArg.getInputCompareVariables(i))) {
                 return Boolean.FALSE;
             }
             if (hasExtraVars && !VariableUtilities.varListEqualUnordered(op.getInputExtraVariables(i),
-                    intersetOpArg.getInputExtraVariables(i))) {
+                    intersectOpArg.getInputExtraVariables(i))) {
                 return Boolean.FALSE;
             }
         }
+        if (!Arrays.deepEquals(op.getPartitionsMap(), intersectOpArg.getPartitionsMap())) {
+            return Boolean.FALSE;
+        }
         return Boolean.TRUE;
     }
 
@@ -543,9 +548,15 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
         if (!partProp.getNodeDomain().sameAs(partPropArg.getNodeDomain())) {
             return Boolean.FALSE;
         }
-        List<LogicalVariable> columns = new ArrayList<LogicalVariable>();
+        if (partProp.getPartitioningType() == IPartitioningProperty.PartitioningType.UNORDERED_PARTITIONED) {
+            if (!((UnorderedPartitionedProperty) partProp)
+                    .samePartitioningScheme(((UnorderedPartitionedProperty) partPropArg))) {
+                return Boolean.FALSE;
+            }
+        }
+        List<LogicalVariable> columns = new ArrayList<>();
         partProp.getColumns(columns);
-        List<LogicalVariable> columnsArg = new ArrayList<LogicalVariable>();
+        List<LogicalVariable> columnsArg = new ArrayList<>();
         partPropArg.getColumns(columnsArg);
         if (columns.size() != columnsArg.size()) {
             return Boolean.FALSE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 71a659b2b1..b49fbccaf2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -523,8 +524,12 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
                 inputExtraVarsCopy.add(deepCopyVariableList(op.getInputExtraVariables(i)));
             }
         }
+        int[][] partitionsMap = op.getPartitionsMap();
+        int[][] partitionsMapCopy =
+                partitionsMap == null ? null : Arrays.stream(partitionsMap).map(int[]::clone).toArray(int[][]::new);
+
         IntersectOperator opCopy = new IntersectOperator(outputCompareVarsCopy, outputExtraVarsCopy,
-                inputCompareVarsCopy, inputExtraVarsCopy);
+                inputCompareVarsCopy, inputExtraVarsCopy, partitionsMapCopy);
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index b7029d1c87..5112812d8c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
@@ -230,7 +231,11 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
                 newInputExtraVars.add(new ArrayList<>(op.getInputExtraVariables(i)));
             }
         }
-        return new IntersectOperator(newOutputCompareVars, newOutputExtraVars, newInputCompareVars, newInputExtraVars);
+        int[][] partitionsMap = op.getPartitionsMap();
+        int[][] partitionsMapCopy =
+                partitionsMap == null ? null : Arrays.stream(partitionsMap).map(int[]::clone).toArray(int[][]::new);
+        return new IntersectOperator(newOutputCompareVars, newOutputExtraVars, newInputCompareVars, newInputExtraVars,
+                partitionsMapCopy);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 38ccee5925..a594e7e026 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -94,21 +94,19 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext ctx) {
         // In a cost-based optimizer, we would also try to propagate the
         // parent's partitioning requirements.
         IPartitioningProperty pp1;
         IPartitioningProperty pp2;
         switch (partitioningType) {
             case PAIRWISE:
-                pp1 = new UnorderedPartitionedProperty(new ListSet<>(keysLeftBranch),
-                        context.getComputationNodeDomain());
-                pp2 = new UnorderedPartitionedProperty(new ListSet<>(keysRightBranch),
-                        context.getComputationNodeDomain());
+                pp1 = UnorderedPartitionedProperty.of(new ListSet<>(keysLeftBranch), ctx.getComputationNodeDomain());
+                pp2 = UnorderedPartitionedProperty.of(new ListSet<>(keysRightBranch), ctx.getComputationNodeDomain());
                 break;
             case BROADCAST:
-                pp1 = new RandomPartitioningProperty(context.getComputationNodeDomain());
-                pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain());
+                pp1 = new RandomPartitioningProperty(ctx.getComputationNodeDomain());
+                pp2 = new BroadcastPartitioningProperty(ctx.getComputationNodeDomain());
                 break;
             default:
                 throw new IllegalStateException();
@@ -141,9 +139,9 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                                             (UnorderedPartitionedProperty) firstDeliveredPartitioning;
                                     Set<LogicalVariable> set1 = upp1.getColumnSet();
                                     UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) requirements;
-                                    Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+                                    Set<LogicalVariable> modifuppreq = new ListSet<>();
                                     Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
-                                    Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+                                    Set<LogicalVariable> covered = new ListSet<>();
                                     Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
                                     List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
                                             ? keysRightBranch : keysLeftBranch;
@@ -182,8 +180,14 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                                                 + " to agree with partitioning property " + firstDeliveredPartitioning
                                                 + " delivered by previous input operator.");
                                     }
-                                    UnorderedPartitionedProperty upp2 =
-                                            new UnorderedPartitionedProperty(modifuppreq, requirements.getNodeDomain());
+                                    UnorderedPartitionedProperty upp2;
+                                    UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) requirements;
+                                    if (rqd.usesPartitionsMap()) {
+                                        upp2 = UnorderedPartitionedProperty.ofPartitionsMap(modifuppreq,
+                                                rqd.getNodeDomain(), rqd.getPartitionsMap());
+                                    } else {
+                                        upp2 = UnorderedPartitionedProperty.of(modifuppreq, rqd.getNodeDomain());
+                                    }
                                     return new Pair<>(false, upp2);
                                 }
                                 case ORDERED_PARTITIONED: {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
index a81bf97be3..2f02a61914 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
@@ -65,7 +65,7 @@ public abstract class AbstractPreSortedDistinctByPOperator extends AbstractDisti
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-            pp = new UnorderedPartitionedProperty(new ListSet<>(columnList), context.getComputationNodeDomain());
+            pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain());
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index d2cc5d9fca..969fd9903f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -230,7 +230,7 @@ public abstract class AbstractPreclusteredGroupByPOperator extends AbstractGroup
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
-            pp = new UnorderedPartitionedProperty(new ListSet<>(columnList), context.getComputationNodeDomain());
+            pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain());
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 5159ac5c46..05b441caac 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -27,7 +28,7 @@ public abstract class AbstractScanPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         return emptyUnaryRequirements();
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index 560435ed39..fcc8c8e9a2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -79,7 +79,7 @@ public abstract class AbstractWindowPOperator extends AbstractPhysicalOperator {
         IPartitioningProperty pp;
         switch (op.getExecutionMode()) {
             case PARTITIONED:
-                pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
+                pp = UnorderedPartitionedProperty.of(new ListSet<>(partitionColumns),
                         context.getComputationNodeDomain());
                 break;
             case UNPARTITIONED:
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 2543330a9c..682d1cf81b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -72,12 +72,12 @@ public class BulkloadPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         List<LogicalVariable> scanVariables = new ArrayList<>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
         IPhysicalPropertiesVector physicalProps =
-                dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+                dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
         StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
                 physicalProps.getLocalProperties());
         return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index 80843a482f..ea19a783f2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -74,11 +74,12 @@ public class DataSourceScanPOperator extends AbstractScanPOperator {
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
         // partitioning properties
         DataSourceScanOperator dssOp = (DataSourceScanOperator) op;
         IDataSourcePropertiesProvider dspp = dataSource.getPropertiesProvider();
-        deliveredProperties = dspp.computeDeliveredProperties(dssOp.getVariables());
+        deliveredProperties = dspp.computeDeliveredProperties(dssOp.getVariables(), context);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 7515258ba9..89e17ade87 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -108,8 +108,9 @@ public class ExternalGroupByPOperator extends AbstractGroupByPOperator {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-            pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
-                    new ListSet<LogicalVariable>(columnList), context.getComputationNodeDomain()), null);
+            pv[0] = new StructuralPropertiesVector(
+                    UnorderedPartitionedProperty.of(new ListSet<>(columnList), context.getComputationNodeDomain()),
+                    null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 91dba242d7..55b40b4151 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -47,12 +47,14 @@ import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescr
 
 public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
 
-    private List<LogicalVariable> hashFields;
-    private INodeDomain domain;
+    private final List<LogicalVariable> hashFields;
+    private final INodeDomain domain;
+    private final int[][] partitionsMap;
 
-    public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, INodeDomain domain) {
+    public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, INodeDomain domain, int[][] partitionsMap) {
         this.hashFields = hashFields;
         this.domain = domain;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -70,7 +72,12 @@ public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(hashFields), domain);
+        IPartitioningProperty p;
+        if (partitionsMap != null) {
+            p = UnorderedPartitionedProperty.ofPartitionsMap(new ListSet<>(hashFields), domain, partitionsMap);
+        } else {
+            p = UnorderedPartitionedProperty.of(new ListSet<>(hashFields), domain);
+        }
         this.deliveredProperties = new StructuralPropertiesVector(p, null);
     }
 
@@ -98,9 +105,13 @@ public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
             hashFunctionFactories[i] = hashFunProvider.getBinaryHashFunctionFactory(env.getVarType(v));
             ++i;
         }
-        ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+        ITuplePartitionComputerFactory tpcf =
+                new FieldHashPartitionComputerFactory(keys, hashFunctionFactories, partitionsMap);
         IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
         return new Pair<>(conn, null);
     }
 
+    public int[][] getPartitionsMap() {
+        return partitionsMap;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index c5ce871f99..58614647fc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -63,12 +63,14 @@ public class HashPartitionMergeExchangePOperator extends AbstractExchangePOperat
     private final List<OrderColumn> orderColumns;
     private final List<LogicalVariable> partitionFields;
     private final INodeDomain domain;
+    private final int[][] partitionsMap;
 
     public HashPartitionMergeExchangePOperator(List<OrderColumn> orderColumns, List<LogicalVariable> partitionFields,
-            INodeDomain domain) {
+            INodeDomain domain, int[][] partitionsMap) {
         this.orderColumns = orderColumns;
         this.partitionFields = partitionFields;
         this.domain = domain;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -82,11 +84,15 @@ public class HashPartitionMergeExchangePOperator extends AbstractExchangePOperat
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        IPartitioningProperty p =
-                new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(partitionFields), domain);
+        IPartitioningProperty p;
+        if (partitionsMap != null) {
+            p = UnorderedPartitionedProperty.ofPartitionsMap(new ListSet<>(partitionFields), domain, partitionsMap);
+        } else {
+            p = UnorderedPartitionedProperty.of(new ListSet<>(partitionFields), domain);
+        }
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
-        List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+        List<ILocalStructuralProperty> locals = new ArrayList<>();
         for (ILocalStructuralProperty prop : op2Locals) {
             if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
                 locals.add(prop);
@@ -133,7 +139,8 @@ public class HashPartitionMergeExchangePOperator extends AbstractExchangePOperat
                 ++i;
             }
         }
-        ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+        ITuplePartitionComputerFactory tpcf =
+                new FieldHashPartitionComputerFactory(keys, hashFunctionFactories, partitionsMap);
 
         int n = orderColumns.size();
         int[] sortFields = new int[n];
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index aee268b32e..50bdf8a7a8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -84,14 +84,14 @@ public class IndexBulkloadPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         //skVarMap is used to remove duplicated variable references for order operator
         Map<Integer, Object> skVarMap = new HashMap<Integer, Object>();
         List<LogicalVariable> scanVariables = new ArrayList<>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
-        IPhysicalPropertiesVector physicalProps =
-                dataSourceIndex.getDataSource().getPropertiesProvider().computeRequiredProperties(scanVariables);
+        IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider()
+                .computeRequiredProperties(scanVariables, context);
         List<ILocalStructuralProperty> localProperties = new ArrayList<>();
         List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
         // Data needs to be sorted based on the [token, number of token, PK]
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 03862758d5..c4f912a596 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -101,15 +101,15 @@ public class IndexInsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
         for (int i = 0; i < numOfAdditionalNonFilteringFields; i++) {
             scanVariables.add(new LogicalVariable(-1));
         }
-        IPhysicalPropertiesVector r =
-                dataSourceIndex.getDataSource().getPropertiesProvider().computeRequiredProperties(scanVariables);
+        IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
+                .computeRequiredProperties(scanVariables, context);
         r.getLocalProperties().clear();
         IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
         requirements[0] = r;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 7527fa66a8..ab4ee61ece 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -80,14 +80,15 @@ public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(keys);
         scanVariables.add(payload);
         if (additionalNonFilteringFields != null) {
             scanVariables.addAll(additionalNonFilteringFields);
         }
-        IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+        IPhysicalPropertiesVector r =
+                dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
         r.getLocalProperties().clear();
         IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
         requirements[0] = r;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index 9a595c58ca..2b838d3142 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -38,6 +38,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSch
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -79,7 +80,11 @@ public class IntersectPOperator extends AbstractPhysicalOperator {
             IPartitioningProperty pp = null;
             if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
                 Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputCompareVariables(i));
-                pp = new UnorderedPartitionedProperty(partitioningVariables, null);
+                INodeDomain nodeDomain = context.getComputationNodeDomain();
+                int[][] partitionsMap = intersectOp.getPartitionsMap();
+                pp = partitionsMap != null
+                        ? UnorderedPartitionedProperty.ofPartitionsMap(partitioningVariables, nodeDomain, partitionsMap)
+                        : UnorderedPartitionedProperty.of(partitioningVariables, nodeDomain);
             }
             pv[i] = new StructuralPropertiesVector(pp, localProps);
         }
@@ -173,4 +178,12 @@ public class IntersectPOperator extends AbstractPhysicalOperator {
     public boolean expensiveThanMaterialization() {
         return false;
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 1e97182589..55ba9f97fc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -80,11 +80,12 @@ public class WriteResultPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(keys);
         scanVariables.add(new LogicalVariable(-1));
-        IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables);
+        IPhysicalPropertiesVector r =
+                dataSource.getPropertiesProvider().computeRequiredProperties(scanVariables, context);
         IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
         requirements[0] = r;
         return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
index a49c4b3bb7..d6f481239a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningRequirementsCoordinator.java
@@ -43,7 +43,7 @@ public interface IPartitioningRequirementsCoordinator {
         @Override
         public Pair<Boolean, IPartitioningProperty> coordinateRequirements(IPartitioningProperty requirements,
                 IPartitioningProperty firstDeliveredPartitioning, ILogicalOperator op, IOptimizationContext context) {
-            return new Pair<Boolean, IPartitioningProperty>(true, requirements);
+            return new Pair<>(true, requirements);
         }
     };
 
@@ -62,9 +62,9 @@ public interface IPartitioningRequirementsCoordinator {
                                         (UnorderedPartitionedProperty) firstDeliveredPartitioning;
                                 Set<LogicalVariable> set1 = upp1.getColumnSet();
                                 UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) rqdpp;
-                                Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+                                Set<LogicalVariable> modifuppreq = new ListSet<>();
                                 Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
-                                Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+                                Set<LogicalVariable> covered = new ListSet<>();
 
                                 // coordinate from an existing partition property
                                 // (firstDeliveredPartitioning)
@@ -94,16 +94,22 @@ public interface IPartitioningRequirementsCoordinator {
                                             "The number of variables are not equal in both partitioning sides");
                                 }
 
-                                UnorderedPartitionedProperty upp2 =
-                                        new UnorderedPartitionedProperty(modifuppreq, rqdpp.getNodeDomain());
-                                return new Pair<Boolean, IPartitioningProperty>(false, upp2);
+                                UnorderedPartitionedProperty upp2;
+                                UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) rqdpp;
+                                if (rqd.usesPartitionsMap()) {
+                                    upp2 = UnorderedPartitionedProperty.ofPartitionsMap(modifuppreq,
+                                            rqd.getNodeDomain(), rqd.getPartitionsMap());
+                                } else {
+                                    upp2 = UnorderedPartitionedProperty.of(modifuppreq, rqd.getNodeDomain());
+                                }
+                                return new Pair<>(false, upp2);
                             }
                             case ORDERED_PARTITIONED: {
                                 throw new NotImplementedException();
                             }
                         }
                     }
-                    return new Pair<Boolean, IPartitioningProperty>(true, rqdpp);
+                    return new Pair<>(true, rqdpp);
                 }
 
             };
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 1025e44820..fbc97d98e9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -136,6 +136,9 @@ public class PropertiesUtil {
                     case UNORDERED_PARTITIONED: {
                         UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
                         UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd;
+                        if (!ur.samePartitioningScheme(ud)) {
+                            return false;
+                        }
                         if (mayExpandProperties) {
                             return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet()));
                         } else {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index fa8650c370..6d4c38982b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.properties;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -30,10 +31,22 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 public final class UnorderedPartitionedProperty extends AbstractGroupingProperty implements IPartitioningProperty {
 
     private INodeDomain domain;
+    private final int[][] partitionsMap;
 
-    public UnorderedPartitionedProperty(Set<LogicalVariable> partitioningVariables, INodeDomain domain) {
+    private UnorderedPartitionedProperty(Set<LogicalVariable> partitioningVariables, INodeDomain domain,
+            int[][] partitionsMap) {
         super(partitioningVariables);
         this.domain = domain;
+        this.partitionsMap = partitionsMap;
+    }
+
+    public static UnorderedPartitionedProperty of(Set<LogicalVariable> partitioningVariables, INodeDomain domain) {
+        return new UnorderedPartitionedProperty(partitioningVariables, domain, null);
+    }
+
+    public static UnorderedPartitionedProperty ofPartitionsMap(Set<LogicalVariable> partitioningVariables,
+            INodeDomain domain, int[][] partitionsMap) {
+        return new UnorderedPartitionedProperty(partitioningVariables, domain, partitionsMap);
     }
 
     @Override
@@ -46,7 +59,7 @@ public final class UnorderedPartitionedProperty extends AbstractGroupingProperty
             List<FunctionalDependency> fds) {
         Set<LogicalVariable> normalizedColumnSet =
                 normalizeAndReduceGroupingColumns(columnSet, equivalenceClasses, fds);
-        return new UnorderedPartitionedProperty(normalizedColumnSet, domain);
+        return new UnorderedPartitionedProperty(normalizedColumnSet, domain, partitionsMap);
     }
 
     @Override
@@ -79,12 +92,23 @@ public final class UnorderedPartitionedProperty extends AbstractGroupingProperty
                 applied = true;
             }
         }
-        return applied ? new UnorderedPartitionedProperty(newColumnSet, domain) : this;
+        return applied ? new UnorderedPartitionedProperty(newColumnSet, domain, partitionsMap) : this;
     }
 
     @Override
     public IPartitioningProperty clonePartitioningProperty() {
-        return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain);
+        return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain, partitionsMap);
     }
 
+    public int[][] getPartitionsMap() {
+        return partitionsMap;
+    }
+
+    public boolean usesPartitionsMap() {
+        return partitionsMap != null;
+    }
+
+    public boolean samePartitioningScheme(UnorderedPartitionedProperty another) {
+        return Arrays.deepEquals(partitionsMap, another.partitionsMap);
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 3ae1218e49..4d32d97c65 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -654,31 +654,24 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
     private IPhysicalOperator createHashConnector(IOptimizationContext ctx, IPhysicalPropertiesVector deliveredByChild,
             INodeDomain domain, IPhysicalPropertiesVector requiredAtChild, IPartitioningProperty rqdPartitioning,
             int childIndex, ILogicalOperator parentOp) {
-        IPhysicalOperator hashConnector;
-        List<LogicalVariable> vars = new ArrayList<>(((UnorderedPartitionedProperty) rqdPartitioning).getColumnSet());
+        UnorderedPartitionedProperty rqd = (UnorderedPartitionedProperty) rqdPartitioning;
+        List<LogicalVariable> vars = new ArrayList<>(rqd.getColumnSet());
         String hashMergeHint = (String) ctx.getMetadataProvider().getConfig().get(HASH_MERGE);
         if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
-            hashConnector = new HashPartitionExchangePOperator(vars, domain);
-            return hashConnector;
+            return new HashPartitionExchangePOperator(vars, domain, rqd.getPartitionsMap());
         }
         List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
         List<ILocalStructuralProperty> reqdLocals = requiredAtChild.getLocalProperties();
-        boolean propWasSet = false;
-        hashConnector = null;
         if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
             AbstractLogicalOperator c = (AbstractLogicalOperator) parentOp.getInputs().get(childIndex).getValue();
             Map<LogicalVariable, EquivalenceClass> ecs = ctx.getEquivalenceClassMap(c);
             List<FunctionalDependency> fds = ctx.getFDList(c);
             if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
                 List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals);
-                hashConnector = new HashPartitionMergeExchangePOperator(orderColumns, vars, domain);
-                propWasSet = true;
+                return new HashPartitionMergeExchangePOperator(orderColumns, vars, domain, rqd.getPartitionsMap());
             }
         }
-        if (!propWasSet) {
-            hashConnector = new HashPartitionExchangePOperator(vars, domain);
-        }
-        return hashConnector;
+        return new HashPartitionExchangePOperator(vars, domain, rqd.getPartitionsMap());
     }
 
     /**
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
index 1b02ab49e3..15ba92d88a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnnecessarySortMergeExchange.java
@@ -94,23 +94,24 @@ public class RemoveUnnecessarySortMergeExchange implements IAlgebraicRewriteRule
         // If yes, we use HashMergeExchange; otherwise, we use HashExchange.
         SortMergeExchangePOperator sme = (SortMergeExchangePOperator) currentOp.getPhysicalOperator();
         HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
-        Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+        Set<LogicalVariable> liveVars = new HashSet<>();
         VariableUtilities.getLiveVariables(op1, liveVars);
         boolean usingHashMergeExchange = true;
         for (OrderColumn oc : sme.getSortColumns()) {
             if (!liveVars.contains(oc.getColumn())) {
                 usingHashMergeExchange = false;
+                break;
             }
         }
 
         if (usingHashMergeExchange) {
             // Add sort columns from the SortMergeExchange into a new HashMergeExchange.
-            List<OrderColumn> ocList = new ArrayList<OrderColumn>();
+            List<OrderColumn> ocList = new ArrayList<>();
             for (OrderColumn oc : sme.getSortColumns()) {
                 ocList.add(oc);
             }
-            HashPartitionMergeExchangePOperator hpme =
-                    new HashPartitionMergeExchangePOperator(ocList, hpe.getHashFields(), hpe.getDomain());
+            HashPartitionMergeExchangePOperator hpme = new HashPartitionMergeExchangePOperator(ocList,
+                    hpe.getHashFields(), hpe.getDomain(), hpe.getPartitionsMap());
             op1.setPhysicalOperator(hpme);
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
index 059d52ae0c..17bf600a99 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/pom.xml
@@ -74,6 +74,10 @@
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
index 31a959f757..186ef1da03 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
@@ -23,10 +23,13 @@ import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
 public class FieldHashPartitionComputer extends HashPartitioner implements ITuplePartitionComputer {
 
-    public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
-        super(hashFields, hashFunctions);
+    public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] hashFunctions,
+            Int2IntMap storagePartition2Compute) {
+        super(hashFields, hashFunctions, storagePartition2Compute);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 52df3b7a43..c91a0ea9a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -24,14 +24,27 @@ import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class FieldHashPartitionComputerFactory implements ITuplePartitionComputerFactory {
-    private static final long serialVersionUID = 1L;
+
+    private static final long serialVersionUID = 2L;
     private final int[] hashFields;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+    private final int[][] partitionsMap;
 
     public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories) {
         this.hashFields = hashFields;
         this.hashFunctionFactories = hashFunctionFactories;
+        this.partitionsMap = null;
+    }
+
+    public FieldHashPartitionComputerFactory(int[] hashFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            int[][] partitionsMap) {
+        this.hashFields = hashFields;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -40,6 +53,16 @@ public class FieldHashPartitionComputerFactory implements ITuplePartitionCompute
         for (int i = 0; i < hashFunctionFactories.length; ++i) {
             hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
         }
-        return new FieldHashPartitionComputer(hashFields, hashFunctions);
+        if (partitionsMap == null) {
+            return new FieldHashPartitionComputer(hashFields, hashFunctions, null);
+        } else {
+            Int2IntMap storagePartition2Compute = new Int2IntOpenHashMap();
+            for (int i = 0; i < partitionsMap.length; i++) {
+                for (int storagePartition : partitionsMap[i]) {
+                    storagePartition2Compute.put(storagePartition, i);
+                }
+            }
+            return new FieldHashPartitionComputer(hashFields, hashFunctions, storagePartition2Compute);
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
index 5620a95d9d..e36315c542 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
@@ -28,7 +28,7 @@ public class FieldHashPartitioner extends HashPartitioner implements ITupleParti
     private final int numPartitions;
 
     public FieldHashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, int numPartitions) {
-        super(hashFields, hashFunctions);
+        super(hashFields, hashFunctions, null);
         this.numPartitions = numPartitions;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
index b09bcb8b12..cb97d1d393 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
@@ -22,14 +22,18 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
 class HashPartitioner {
 
     private final int[] hashFields;
     private final IBinaryHashFunction[] hashFunctions;
+    private final Int2IntMap storagePartition2Compute;
 
-    public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions) {
+    public HashPartitioner(int[] hashFields, IBinaryHashFunction[] hashFunctions, Int2IntMap storagePartition2Compute) {
         this.hashFields = hashFields;
         this.hashFunctions = hashFunctions;
+        this.storagePartition2Compute = storagePartition2Compute;
     }
 
     protected int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
@@ -50,6 +54,15 @@ class HashPartitioner {
         if (h < 0) {
             h = -(h + 1);
         }
-        return h % nParts;
+        if (storagePartition2Compute == null) {
+            return h % nParts;
+        } else {
+            int storagePartition = h % storagePartition2Compute.size();
+            int computePartition = storagePartition2Compute.getOrDefault(storagePartition, Integer.MIN_VALUE);
+            if (computePartition < 0 || computePartition >= nParts) {
+                throw new IllegalStateException("couldn't resolve storage partition to compute partition");
+            }
+            return computePartition;
+        }
     }
 }