You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2018/10/16 04:18:32 UTC

[02/21] asterixdb git commit: [ASTERIXDB-2286][COMP][FUN][HYR] Parallel Sort Optimization

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 3335d71..fa11f73 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -52,6 +52,10 @@ import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
+/**
+ * Pre-conditions:
+ *      FixReplicateOperatorOutputsRule should be fired
+ */
 public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
 
     private final HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents =
@@ -62,6 +66,8 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
     private final HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<>();
     private final HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<>();
     private int lastUsedClusterId = 0;
+    private final Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs = new HashMap<>();
+    private final List<Pair<Mutable<ILogicalOperator>, Boolean>> newOutputs = new ArrayList<>();
 
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
@@ -268,11 +274,71 @@ public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
                     context.computeAndSetTypeEnvironmentForOperator(parentOp);
                 }
             }
+            cleanupPlan();
             rewritten = true;
         }
         return rewritten;
     }
 
+    /**
+     * Cleans up the plan after combining similar branches into one branch making sure parents & children point to
+     * each other correctly.
+     */
+    private void cleanupPlan() {
+        for (Mutable<ILogicalOperator> root : roots) {
+            replicateToOutputs.clear();
+            newOutputs.clear();
+            findReplicateOp(root, replicateToOutputs);
+            cleanup(replicateToOutputs, newOutputs);
+        }
+    }
+
+    /**
+     * Updates the outputs references of a replicate operator to points to the valid parents.
+     * @param replicateToOutputs where the replicate operators are stored with its valid parents.
+     * @param newOutputs the valid parents of replicate operator.
+     */
+    private void cleanup(Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs,
+            List<Pair<Mutable<ILogicalOperator>, Boolean>> newOutputs) {
+        replicateToOutputs.forEach((repRef, allOutputs) -> {
+            newOutputs.clear();
+            // get the indexes that are set in the BitSet
+            allOutputs.stream().forEach(outIndex -> {
+                newOutputs.add(new Pair<>(((AbstractReplicateOperator) repRef.getValue()).getOutputs().get(outIndex),
+                        ((AbstractReplicateOperator) repRef.getValue()).getOutputMaterializationFlags()[outIndex]));
+            });
+            ((AbstractReplicateOperator) repRef.getValue()).setOutputs(newOutputs);
+        });
+    }
+
+    /**
+     * Collects all replicate operator starting from {@param parent} and all its descendants and keeps track of the
+     * valid parents of a replicate operator. The indexes of valid parents will be set in the BitSet.
+     * @param parent the current operator in consideration for which we want to find replicate op children.
+     * @param replicateToOutputs where the replicate operators will be stored with all its parents (valid & invalid).
+     */
+    private void findReplicateOp(Mutable<ILogicalOperator> parent,
+            Map<Mutable<ILogicalOperator>, BitSet> replicateToOutputs) {
+        List<Mutable<ILogicalOperator>> children = parent.getValue().getInputs();
+        for (Mutable<ILogicalOperator> childRef : children) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+            if (child.getOperatorTag() == LogicalOperatorTag.REPLICATE
+                    || child.getOperatorTag() == LogicalOperatorTag.SPLIT) {
+                AbstractReplicateOperator replicateChild = (AbstractReplicateOperator) child;
+                int parentIndex = replicateChild.getOutputs().indexOf(parent);
+                if (parentIndex >= 0) {
+                    BitSet replicateValidOutputs = replicateToOutputs.get(childRef);
+                    if (replicateValidOutputs == null) {
+                        replicateValidOutputs = new BitSet();
+                        replicateToOutputs.put(childRef, replicateValidOutputs);
+                    }
+                    replicateValidOutputs.set(parentIndex);
+                }
+            }
+            findReplicateOp(childRef, replicateToOutputs);
+        }
+    }
+
     private void genCandidates(IOptimizationContext context) throws AlgebricksException {
         List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses =
                 new ArrayList<List<Mutable<ILogicalOperator>>>();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 6967271..5d6237a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -67,15 +67,18 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
  */
 public class InlineVariablesRule implements IAlgebraicRewriteRule {
 
-    // Map of variables that could be replaced by their producing expression.
-    // Populated during the top-down sweep of the plan.
-    protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>();
-    // Visitor for replacing variable reference expressions with their originating expression.
+    // map of variables that could be replaced by their producing expression.
+    // populated during the top-down sweep of the plan.
+    private Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>();
+    // visitor for replacing variable reference expressions with their originating expression.
     protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
-    // Set of FunctionIdentifiers that we should not inline.
+    // set of FunctionIdentifiers that we should not inline.
     protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<>();
-    // Indicates whether the rule has been run
-    protected boolean hasRun = false;
+    // indicates whether the rule has been run
+    private boolean hasRun = false;
+    // set to prevent re-visiting a subtree from the other sides. Operators with multiple outputs are the ones that
+    // could be re-visited twice or more (e.g. replicate and split operators)
+    private final Map<ILogicalOperator, Boolean> subTreesDone = new HashMap<>();
 
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
@@ -103,6 +106,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
     protected void prepare(IOptimizationContext context) {
         varAssignRhs.clear();
         inlineVisitor.setContext(context);
+        subTreesDone.clear();
     }
 
     protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
@@ -118,10 +122,14 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
         return false;
     }
 
-    protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+    private boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
 
+        // check if you have already visited the subtree rooted at this operator
+        if (subTreesDone.containsKey(op)) {
+            return subTreesDone.get(op);
+        }
         // Update mapping from variables to expressions during top-down traversal.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
@@ -183,6 +191,10 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
             // Re-enable rules that we may have already tried. They could be applicable now after inlining.
             context.removeFromAlreadyCompared(opRef.getValue());
         }
+        // mark the subtree rooted at op as visited so that you don't visit it again
+        if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE || op.getOperatorTag() == LogicalOperatorTag.SPLIT) {
+            subTreesDone.put(op, modified);
+        }
 
         return modified;
     }
@@ -209,7 +221,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
             this.context = context;
         }
 
-        public void setOperator(ILogicalOperator op) throws AlgebricksException {
+        public void setOperator(ILogicalOperator op) {
             this.op = op;
             liveVars.clear();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 4869761..4273553 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -59,6 +59,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceS
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
@@ -394,6 +395,9 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                     op.setPhysicalOperator(new SinkPOperator());
                     break;
                 }
+                case FORWARD:
+                    op.setPhysicalOperator(new ForwardPOperator());
+                    break;
             }
         }
         if (op.hasNestedPlans()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 35aa984..6b09894 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -34,10 +34,11 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -246,6 +247,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
     }
 
     @Override
+    public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
     public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
         return visit(op);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
index 0dcc83a..2068ef3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
@@ -22,5 +22,23 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ITuplePartitionComputer {
+    /**
+     * For the tuple (located at tIndex in the frame), it determines which target partition (0,1,... nParts-1) the tuple
+     * should be sent/written to.
+     * @param accessor The accessor of the frame to access tuples
+     * @param tIndex The index of the tuple in consideration
+     * @param nParts The number of target partitions
+     * @return The chosen target partition number as dictated by the logic of the partition computer
+     * @throws HyracksDataException
+     */
     public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException;
+
+    /**
+     * Gives the data partitioner a chance to set up its environment before it starts partitioning tuples. This method
+     * should be called in the open() of {@link org.apache.hyracks.api.comm.IFrameWriter}. The default implementation
+     * is "do nothing".
+     * @throws HyracksDataException
+     */
+    public default void initialize() throws HyracksDataException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
index cde0057..81f9053 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.api.dataflow.value;
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
 public interface ITuplePartitionComputerFactory extends Serializable {
-    public ITuplePartitionComputer createPartitioner();
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 09193d9..7d126ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -149,6 +149,9 @@ public class ErrorCode {
     public static final int UNDEFINED_INVERTED_LIST_MERGE_TYPE = 113;
     public static final int NODE_IS_NOT_ACTIVE = 114;
     public static final int LOCAL_NETWORK_ERROR = 115;
+    public static final int ONE_TUPLE_RANGEMAP_EXPECTED = 116;
+    public static final int NO_RANGEMAP_PRODUCED = 117;
+    public static final int RANGEMAP_NOT_FOUND = 118;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index c704d7e..50e92b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -132,6 +132,9 @@
 113 = Undefined inverted-list merge type: %1$s
 114 = Node (%1$s) is not active
 115 = Local network error
+116 = One tuple rangemap is expected
+117 = No range map produced for parallel sort
+118 = Range map was not found for parallel sort
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java
new file mode 100644
index 0000000..d858a7f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleDataInputStream.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import java.io.DataInputStream;
+
+public class ByteArrayAccessibleDataInputStream extends DataInputStream {
+
+    public ByteArrayAccessibleDataInputStream(ByteArrayAccessibleInputStream in) {
+        super(in);
+    }
+
+    public ByteArrayAccessibleInputStream getInputStream() {
+        return (ByteArrayAccessibleInputStream) in;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java
new file mode 100644
index 0000000..2785751
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import java.io.ByteArrayInputStream;
+
+public class ByteArrayAccessibleInputStream extends ByteArrayInputStream {
+
+    public ByteArrayAccessibleInputStream(byte[] buf, int offset, int length) {
+        super(buf, offset, length);
+    }
+
+    public void setContent(byte[] buf, int offset, int length) {
+        this.buf = buf;
+        this.pos = offset;
+        this.count = Math.min(offset + length, buf.length);
+        this.mark = offset;
+    }
+
+    public byte[] getArray() {
+        return buf;
+    }
+
+    public int getPosition() {
+        return pos;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index dc66d19..ab5ab01 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.dataflow.common.data.partition;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -36,7 +37,7 @@ public class FieldHashPartitionComputerFactory implements ITuplePartitionCompute
     }
 
     @Override
-    public ITuplePartitionComputer createPartitioner() {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
         final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length];
         for (int i = 0; i < hashFunctionFactories.length; ++i) {
             hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java
new file mode 100644
index 0000000..e55841a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/OnePartitionComputerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+
+public class OnePartitionComputerFactory implements ITuplePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
+        return new ITuplePartitionComputer() {
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+                return 0;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
index e034af0..63d01fc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.common.data.partition;
 import java.util.Random;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,7 +31,7 @@ public class RandomPartitionComputerFactory implements ITuplePartitionComputerFa
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITuplePartitionComputer createPartitioner() {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
         return new ITuplePartitionComputer() {
 
             private final Random random = new Random();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
index 9cb11fa..1821d78 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.dataflow.common.data.partition;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,9 +36,9 @@ public class RepartitionComputerFactory implements ITuplePartitionComputerFactor
     }
 
     @Override
-    public ITuplePartitionComputer createPartitioner() {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
         return new ITuplePartitionComputer() {
-            private ITuplePartitionComputer delegate = delegateFactory.createPartitioner();
+            private ITuplePartitionComputer delegate = delegateFactory.createPartitioner(hyracksTaskContext);
 
             @Override
             public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..bc642a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/DynamicFieldRangePartitionComputerFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+
+public class DynamicFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private final String rangeMapKeyInContext;
+    private final SourceLocation sourceLocation;
+
+    public DynamicFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
+            String rangeMapKeyInContext, SourceLocation sourceLocation) {
+        super(rangeFields, comparatorFactories);
+        this.rangeMapKeyInContext = rangeMapKeyInContext;
+        this.sourceLocation = sourceLocation;
+    }
+
+    @Override
+    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException {
+        RangeMap rangeMap = TaskUtil.get(rangeMapKeyInContext, hyracksTaskContext);
+        if (rangeMap == null) {
+            throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
+        }
+        return rangeMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index d58a248..55d4420 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -19,36 +19,41 @@
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+public abstract class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
     private final int[] rangeFields;
-    private IRangeMap rangeMap;
     private IBinaryComparatorFactory[] comparatorFactories;
 
-    public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
-            IRangeMap rangeMap) {
+    protected FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories) {
         this.rangeFields = rangeFields;
         this.comparatorFactories = comparatorFactories;
-        this.rangeMap = rangeMap;
     }
 
+    protected abstract RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) throws HyracksDataException;
+
     @Override
-    public ITuplePartitionComputer createPartitioner() {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
+
         return new ITuplePartitionComputer() {
+            private RangeMap rangeMap;
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                rangeMap = getRangeMap(hyracksTaskContext);
+            }
+
             @Override
-            /**
-             * Determine the range partition.
-             */
             public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
                 if (nParts == 1) {
                     return 0;
@@ -62,13 +67,10 @@ public class FieldRangePartitionComputerFactory implements ITuplePartitionComput
                 return (int) Math.floor(slotIndex / rangesPerPart);
             }
 
-            /*
-             * Determine the range partition.
-             */
-            public int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+            private int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
                 int slotIndex = 0;
-                for (int i = 0; i < rangeMap.getSplitCount(); ++i) {
-                    int c = compareSlotAndFields(accessor, tIndex, i);
+                for (int slotNumber = 0; slotNumber < rangeMap.getSplitCount(); ++slotNumber) {
+                    int c = compareSlotAndFields(accessor, tIndex, slotNumber);
                     if (c < 0) {
                         return slotIndex;
                     }
@@ -77,18 +79,18 @@ public class FieldRangePartitionComputerFactory implements ITuplePartitionComput
                 return slotIndex;
             }
 
-            public int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int fieldIndex)
+            private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int slotNumber)
                     throws HyracksDataException {
                 int c = 0;
                 int startOffset = accessor.getTupleStartOffset(tIndex);
                 int slotLength = accessor.getFieldSlotsLength();
-                for (int f = 0; f < comparators.length; ++f) {
-                    int fIdx = rangeFields[f];
+                for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
+                    int fIdx = rangeFields[fieldNum];
                     int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
                     int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
-                    c = comparators[f].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
-                            fEnd - fStart, rangeMap.getByteArray(fieldIndex, f), rangeMap.getStartOffset(fieldIndex, f),
-                            rangeMap.getLength(fieldIndex, f));
+                    c = comparators[fieldNum].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
+                            fEnd - fStart, rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber),
+                            rangeMap.getLength(fieldNum, slotNumber));
                     if (c != 0) {
                         return c;
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
deleted file mode 100644
index 5c5f34b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.data.partition.range;
-
-import org.apache.hyracks.data.std.api.IPointable;
-
-public interface IRangeMap {
-    public IPointable getFieldSplit(int columnIndex, int splitIndex);
-
-    public int getSplitCount();
-
-    public byte[] getByteArray(int columnIndex, int splitIndex);
-
-    public int getStartOffset(int columnIndex, int splitIndex);
-
-    public int getLength(int columnIndex, int splitIndex);
-
-    public int getTag(int columnIndex, int splitIndex);
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index 98acbc0..714e3c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -19,80 +19,110 @@
 package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import java.io.Serializable;
-
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
+import java.util.Arrays;
+import java.util.Objects;
 
 /**
- * The range map stores the field split values in an byte array.
- * The first split value for each field followed by the second split value for each field, etc.
+ * <pre>
+ * The range map stores the fields split values in a byte array.
+ * The first split value for each field followed by the second split value for each field, etc. For example:
+ *                  split_point_idx0    split_point_idx1    split_point_idx2    split_point_idx3    split_point_idx4
+ * in the byte[]:   f0,f1,f2            f0,f1,f2            f0,f1,f2            f0,f1,f2            f0,f1,f2
+ * numFields would be = 3
+ * we have 5 split points, which gives us 6 partitions:
+ *      p1  |       p2      |       p3      |       p4      |       p5      |       p6
+ *          sp0             sp1             sp2             sp3             sp4
+ * endOffsets.length would be = 15
+ * </pre>
  */
-public class RangeMap implements IRangeMap, Serializable {
-    private final int fields;
+public class RangeMap implements Serializable {
+    private final int numFields;
     private final byte[] bytes;
-    private final int[] offsets;
+    private final int[] endOffsets;
 
-    public RangeMap(int fields, byte[] bytes, int[] offsets) {
-        this.fields = fields;
+    public RangeMap(int numFields, byte[] bytes, int[] endOffsets) {
+        this.numFields = numFields;
         this.bytes = bytes;
-        this.offsets = offsets;
-    }
-
-    @Override
-    public IPointable getFieldSplit(int columnIndex, int splitIndex) {
-        IPointable p = VoidPointable.FACTORY.createPointable();
-        int index = getFieldIndex(columnIndex, splitIndex);
-        p.set(bytes, getFieldStart(index), getFieldLength(index));
-        return p;
+        this.endOffsets = endOffsets;
     }
 
-    @Override
     public int getSplitCount() {
-        return offsets.length / fields;
+        return endOffsets.length / numFields;
     }
 
-    @Override
-    public byte[] getByteArray(int columnIndex, int splitIndex) {
+    public byte[] getByteArray() {
         return bytes;
     }
 
-    @Override
-    public int getTag(int columnIndex, int splitIndex) {
-        return getFieldTag(getFieldIndex(columnIndex, splitIndex));
+    public int getTag(int fieldIndex, int splitIndex) {
+        return getSplitValueTag(getSplitValueIndex(fieldIndex, splitIndex));
     }
 
-    @Override
-    public int getStartOffset(int columnIndex, int splitIndex) {
-        return getFieldStart(getFieldIndex(columnIndex, splitIndex));
+    public int getStartOffset(int fieldIndex, int splitIndex) {
+        return getSplitValueStart(getSplitValueIndex(fieldIndex, splitIndex));
     }
 
-    @Override
-    public int getLength(int columnIndex, int splitIndex) {
-        return getFieldLength(getFieldIndex(columnIndex, splitIndex));
+    public int getLength(int fieldIndex, int splitIndex) {
+        return getSplitValueLength(getSplitValueIndex(fieldIndex, splitIndex));
     }
 
-    private int getFieldIndex(int columnIndex, int splitIndex) {
-        return splitIndex * fields + columnIndex;
+    /** Translates fieldIndex & splitIndex into an index which is used to find information about that split value.
+     * The combination of a fieldIndex & splitIndex uniquely identifies a split value of interest.
+     * @param fieldIndex the field index within the splitIndex of interest (0 <= fieldIndex < numFields)
+     * @param splitIndex starts with 0,1,2,.. etc
+     * @return the index of the desired split value that could be used with {@code bytes} & {@code endOffsets}.
+     */
+    private int getSplitValueIndex(int fieldIndex, int splitIndex) {
+        return splitIndex * numFields + fieldIndex;
     }
 
-    private int getFieldTag(int index) {
-        return bytes[getFieldStart(index)];
+    /**
+     * @param splitValueIndex is the combination of the split index + the field index within that split index
+     * @return the type tag of a specific field in a specific split point
+     */
+    private int getSplitValueTag(int splitValueIndex) {
+        return bytes[getSplitValueStart(splitValueIndex)];
     }
 
-    private int getFieldStart(int index) {
+    /**
+     * @param splitValueIndex is the combination of the split index + the field index within that split index
+     * @return the location of a split value in the byte array {@code bytes}
+     */
+    private int getSplitValueStart(int splitValueIndex) {
         int start = 0;
-        if (index != 0) {
-            start = offsets[index - 1];
+        if (splitValueIndex != 0) {
+            start = endOffsets[splitValueIndex - 1];
         }
         return start;
     }
 
-    private int getFieldLength(int index) {
-        int length = offsets[index];
-        if (index != 0) {
-            length -= offsets[index - 1];
+    /**
+     * @param splitValueIndex is the combination of the split index + the field index within that split index
+     * @return the length of a split value
+     */
+    private int getSplitValueLength(int splitValueIndex) {
+        int length = endOffsets[splitValueIndex];
+        if (splitValueIndex != 0) {
+            length -= endOffsets[splitValueIndex - 1];
         }
         return length;
     }
 
+    @Override
+    public int hashCode() {
+        return numFields + Arrays.hashCode(bytes) + Arrays.hashCode(endOffsets);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (this == object) {
+            return true;
+        }
+        if (!(object instanceof RangeMap)) {
+            return false;
+        }
+        RangeMap other = (RangeMap) object;
+        return numFields == other.numFields && Arrays.equals(endOffsets, other.endOffsets)
+                && Arrays.equals(bytes, other.bytes);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..b17c550
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/StaticFieldRangePartitionComputerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class StaticFieldRangePartitionComputerFactory extends FieldRangePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private RangeMap rangeMap;
+
+    public StaticFieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
+            RangeMap rangeMap) {
+        super(rangeFields, comparatorFactories);
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    protected RangeMap getRangeMap(IHyracksTaskContext hyracksTaskContext) {
+        return rangeMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index 7d97507..8b57b15 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -20,7 +20,14 @@ package org.apache.hyracks.dataflow.std.base;
 
 import java.util.BitSet;
 
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public abstract class AbstractMToNConnectorDescriptor extends AbstractConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -47,4 +54,15 @@ public abstract class AbstractMToNConnectorDescriptor extends AbstractConnectorD
     public boolean allProducersToAllConsumers() {
         return true;
     }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        BitSet expectedPartitions = new BitSet(nProducerPartitions);
+        expectedPartitions.set(0, nProducerPartitions);
+        NonDeterministicChannelReader channelReader =
+                new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
+        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index ab553f6..4c728ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -180,6 +180,7 @@ public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperat
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    // TODO: shouldn't we fail the MaterializerTaskState state?
                     HyracksDataException hde = null;
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
                         if (isOpen[i]) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
new file mode 100644
index 0000000..c437619
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.collectors;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hyracks.api.channels.IInputChannel;
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.partitions.PartitionId;
+
+public class DeterministicPartitionBatchManager implements IPartitionBatchManager {
+    private final IFrameReader[] partitions;
+    private List<IFrameReader> partitionsList;
+
+    public DeterministicPartitionBatchManager(int nSenders) {
+        this.partitions = new IFrameReader[nSenders];
+    }
+
+    @Override
+    public synchronized void addPartition(PartitionId partitionId, IInputChannel channel) {
+        InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
+        channel.registerMonitor(channelReader);
+        partitions[partitionId.getSenderIndex()] = channelReader;
+        if (allPartitionsAdded()) {
+            partitionsList = new ArrayList<>(Arrays.asList(partitions));
+            notifyAll();
+        }
+    }
+
+    @Override
+    public synchronized void getNextBatch(List<IFrameReader> batch, int requestedSize) throws HyracksDataException {
+        while (!allPartitionsAdded()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
+            }
+        }
+        if (partitionsList.isEmpty()) {
+            return;
+        }
+        if (requestedSize >= partitionsList.size()) {
+            batch.addAll(partitionsList);
+            partitionsList.clear();
+        } else {
+            List<IFrameReader> subBatch = partitionsList.subList(0, requestedSize);
+            batch.addAll(subBatch);
+            subBatch.clear();
+        }
+    }
+
+    private synchronized boolean allPartitionsAdded() {
+        for (int i = 0; i < partitions.length; i++) {
+            if (partitions[i] == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
new file mode 100644
index 0000000..2646c94
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.collectors;
+
+import java.util.LinkedList;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SequentialMergeFrameReader implements IFrameReader {
+    private final int numSenders;
+    private final IPartitionBatchManager partitionBatchManager;
+    private final LinkedList<IFrameReader> senders;
+    private boolean isOpen;
+
+    public SequentialMergeFrameReader(int numSenders, IPartitionBatchManager partitionBatchManager) {
+        this.numSenders = numSenders;
+        this.partitionBatchManager = partitionBatchManager;
+        this.senders = new LinkedList<>();
+        this.isOpen = false;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (!isOpen) {
+            isOpen = true;
+            // get all the senders and open them one by one
+            partitionBatchManager.getNextBatch(senders, numSenders);
+            for (IFrameReader sender : senders) {
+                sender.open();
+            }
+        }
+    }
+
+    @Override
+    public boolean nextFrame(IFrame outFrame) throws HyracksDataException {
+        IFrameReader currentSender;
+        while (!senders.isEmpty()) {
+            // process the sender at the beginning of the sequence
+            currentSender = senders.getFirst();
+            outFrame.reset();
+            if (currentSender.nextFrame(outFrame)) {
+                return true;
+            } else {
+                // done with the current sender, close it, remove it from the Q and process the next one in sequence
+                currentSender.close();
+                senders.removeFirst();
+            }
+        }
+        // done with all senders
+        return false;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        for (IFrameReader sender : senders) {
+            sender.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 920fdb8..0b6e40e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -61,7 +61,7 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
             throws HyracksDataException {
-        return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(),
+        return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(ctx),
                 nConsumerPartitions, localityMap, index);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
index 092b5f1..32618ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
@@ -19,19 +19,14 @@
 package org.apache.hyracks.dataflow.std.connectors;
 
 import java.nio.ByteBuffer;
-import java.util.BitSet;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.IPartitionCollector;
 import org.apache.hyracks.api.comm.IPartitionWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
-import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescriptor {
 
@@ -123,15 +118,4 @@ public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescr
             }
         };
     }
-
-    @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
-            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
-        BitSet expectedPartitions = new BitSet(nProducerPartitions);
-        expectedPartitions.set(0, nProducerPartitions);
-        NonDeterministicChannelReader channelReader =
-                new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
-        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
-        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 02fbedb..c11c08c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -18,10 +18,7 @@
  */
 package org.apache.hyracks.dataflow.std.connectors;
 
-import java.util.BitSet;
-
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.IPartitionCollector;
 import org.apache.hyracks.api.comm.IPartitionWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -29,9 +26,6 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
-import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
-import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -46,18 +40,7 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDe
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
             throws HyracksDataException {
-        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
-    }
-
-    @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
-            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
-        BitSet expectedPartitions = new BitSet(nProducerPartitions);
-        expectedPartitions.set(0, nProducerPartitions);
-        NonDeterministicChannelReader channelReader =
-                new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
-        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
-        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx));
     }
 
     public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 026ca5e..e0ec5d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -70,7 +70,7 @@ public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConn
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
             throws HyracksDataException {
         final PartitionDataWriter hashWriter =
-                new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
+                new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx));
         return hashWriter;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
index f6996f1..af3ce06 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -45,6 +45,6 @@ public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitio
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
             throws HyracksDataException {
         return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
-                tpcf.createPartitioner());
+                tpcf.createPartitioner(ctx));
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java
new file mode 100644
index 0000000..3decb69
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToOneSequentialMergingConnectorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameReader;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.OnePartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.collectors.DeterministicPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
+import org.apache.hyracks.dataflow.std.collectors.SequentialMergeFrameReader;
+
+public class MToOneSequentialMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final ITuplePartitionComputerFactory tpcf;
+
+    public MToOneSequentialMergingConnectorDescriptor(IConnectorDescriptorRegistry spec) {
+        super(spec);
+        tpcf = new OnePartitionComputerFactory();
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
+        // TODO(ali): create a single partition data writer instead
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner(ctx));
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        IPartitionBatchManager pbm = new DeterministicPartitionBatchManager(nProducerPartitions);
+        IFrameReader sequentialMergeReader = new SequentialMergeFrameReader(nProducerPartitions, pbm);
+        BitSet expectedPartitions = new BitSet();
+        expectedPartitions.set(0, nProducerPartitions);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, sequentialMergeReader, pbm);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 5e33275..d06d5d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -113,6 +113,7 @@ public class PartitionDataWriter implements IFrameWriter {
 
     @Override
     public void open() throws HyracksDataException {
+        tpc.initialize();
         for (int i = 0; i < pWriters.length; ++i) {
             isOpen[i] = true;
             pWriters[i].open();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index dc250e6..034b054 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -190,7 +190,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                         ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
                 private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
                 private final ITuplePartitionComputer hpcBuild =
-                        new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner();
+                        new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
                 private final FrameTupleAppender appender = new FrameTupleAppender();
                 private final FrameTupleAppender ftappender = new FrameTupleAppender();
                 private IFrame[] bufferForPartitions;
@@ -303,9 +303,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                     }
 
                     ITuplePartitionComputer hpc0 =
-                            new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner();
+                            new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx);
                     ITuplePartitionComputer hpc1 =
-                            new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner();
+                            new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
                     int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
                     ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
                     state.joiner =
@@ -385,7 +385,7 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                         new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories);
                 private final ITuplePartitionComputerFactory hpcf1 =
                         new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories);
-                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
+                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner(ctx);
 
                 private final FrameTupleAppender appender = new FrameTupleAppender();
                 private final FrameTupleAppender ftap = new FrameTupleAppender();
@@ -476,9 +476,9 @@ public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor
                             state.joiner.releaseMemory();
                         }
                         ITuplePartitionComputer hpcRep0 =
-                                new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner();
+                                new RepartitionComputerFactory(state.nPartitions, hpcf0).createPartitioner(ctx);
                         ITuplePartitionComputer hpcRep1 =
-                                new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner();
+                                new RepartitionComputerFactory(state.nPartitions, hpcf1).createPartitioner(ctx);
                         if (state.memoryForHashtable != memsize - 2) {
                             for (int i = 0; i < state.nPartitions; i++) {
                                 ByteBuffer buf = bufferForPartitions[i].getBuffer();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 3873bae..a5c17f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -182,9 +182,9 @@ public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescript
                 @Override
                 public void open() throws HyracksDataException {
                     ITuplePartitionComputer hpc0 =
-                            new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner();
+                            new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories).createPartitioner(ctx);
                     ITuplePartitionComputer hpc1 =
-                            new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner();
+                            new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories).createPartitioner(ctx);
                     state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
                             new TaskId(getActivityId(), partition));
                     ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);