You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2018/10/16 04:18:34 UTC

[04/21] asterixdb git commit: [ASTERIXDB-2286][COMP][FUN][HYR] Parallel Sort Optimization

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
new file mode 100644
index 0000000..db11712
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.TypePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;
+import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Forward operator is used to forward data to different NCs based on a range map that is computed dynamically
+ * by doing a pass over the data itself to infer the range map. The operator takes two inputs:
+ * 1. Tuples/data (at index 0). The data is forwarded to the range-based connector which routes it to the target NC.
+ * 2. Range map (at index 1). The range map will be stored in Hyracks context, and the connector will pick it up.
+ * Forward operator will receive the range map when it is broadcast by the operator generating the range map after which
+ * the forward operator will start forwarding the data.
+ */
+public class ForwardOperator extends AbstractLogicalOperator {
+
+    private final String rangeMapKey;
+    private final Mutable<ILogicalExpression> rangeMapExpression;
+
+    public ForwardOperator(String rangeMapKey, Mutable<ILogicalExpression> rangeMapExpression) {
+        super();
+        this.rangeMapKey = rangeMapKey;
+        this.rangeMapExpression = rangeMapExpression;
+    }
+
+    public String getRangeMapKey() {
+        return rangeMapKey;
+    }
+
+    public Mutable<ILogicalExpression> getRangeMapExpression() {
+        return rangeMapExpression;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.FORWARD;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        // schema is equal to the schema of the data source at idx 0
+        setSchema(inputs.get(0).getValue().getSchema());
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        return visitor.transform(rangeMapExpression);
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitForwardOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                // propagate the variables of the data source at idx 0
+                if (sources.length > 0) {
+                    target.addAllVariables(sources[0]);
+                }
+            }
+        };
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        // propagate the type environment of the data source at idx 0
+        ITypeEnvPointer[] envPointers = new ITypeEnvPointer[] { new OpRefTypeEnvPointer(inputs.get(0), ctx) };
+        return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMissableTypeComputer(),
+                ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index d0aea60..9d853eb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -31,11 +31,12 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -276,6 +277,11 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long
     }
 
     @Override
+    public Long visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
     public Long visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
         long cardinality = UNKNOWN;
         for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index d0d121f..16fc1ed 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -53,6 +53,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -560,6 +561,12 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, IOptimizationContext ctx) throws AlgebricksException {
         setEmptyFDsEqClasses(op, ctx);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 90c0067..2b5e569 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -595,6 +596,18 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
     }
 
     @Override
+    public Boolean visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator argOperator = (AbstractLogicalOperator) arg;
+        if (argOperator.getOperatorTag() != LogicalOperatorTag.FORWARD) {
+            return Boolean.FALSE;
+        }
+        ForwardOperator otherOp = (ForwardOperator) copyAndSubstituteVar(op, arg);
+        ILogicalExpression rangeMapExp = op.getRangeMapExpression().getValue();
+        ILogicalExpression otherRangeMapExp = otherOp.getRangeMapExpression().getValue();
+        return rangeMapExp.equals(otherRangeMapExp) && op.getRangeMapKey().equals(otherOp.getRangeMapKey());
+    }
+
+    @Override
     public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 2caa252..742d485 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -284,6 +285,12 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index e0210cc..0196db6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -587,6 +588,14 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
     }
 
     @Override
+    public ILogicalOperator visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
+        ForwardOperator opCopy = new ForwardOperator(op.getRangeMapKey(),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getRangeMapExpression()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
     public ILogicalOperator visitDelegateOperator(DelegateOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 6dfe254..7d3d676 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -30,11 +30,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractAssi
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -289,6 +290,12 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, IOptimizationContext arg) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 0db0f74..c6f0c14 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -342,6 +343,11 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
     }
 
     @Override
+    public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+        return new ForwardOperator(op.getRangeMapKey(), deepCopyExpressionRef(op.getRangeMapExpression()));
+    }
+
+    @Override
     public ILogicalOperator visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
         return new SinkOperator();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index c96276f..f36f604 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -32,11 +32,12 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -278,6 +279,11 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, IOptimizationContext arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, IOptimizationContext arg)
             throws AlgebricksException {
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index ec96d48..5d0ef6a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -36,11 +36,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnne
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -278,6 +279,11 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 59ccd84..70ccf6d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -318,6 +319,13 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+        // only consider variables from the branch of the data source
+        VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), schemaVariables);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 3587e29..c62f555 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -460,6 +461,14 @@ public class SubstituteVariableVisitor
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        op.getRangeMapExpression().getValue().substituteVar(arg.first, arg.second);
+        substVarTypes(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e66809e..2c68697 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -138,50 +139,43 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
             switch (physOp.getOperatorTag()) {
                 case BROADCAST_EXCHANGE:
                 case ONE_TO_ONE_EXCHANGE:
-                case RANDOM_MERGE_EXCHANGE: {
+                case RANDOM_MERGE_EXCHANGE:
+                case SEQUENTIAL_MERGE_EXCHANGE:
                     // No variables used.
                     break;
-                }
-                case HASH_PARTITION_EXCHANGE: {
-                    HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;
-                    usedVariables.addAll(concreteOp.getHashFields());
+                case HASH_PARTITION_EXCHANGE:
+                    HashPartitionExchangePOperator hashPartitionPOp = (HashPartitionExchangePOperator) physOp;
+                    usedVariables.addAll(hashPartitionPOp.getHashFields());
                     break;
-                }
-                case HASH_PARTITION_MERGE_EXCHANGE: {
-                    HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;
-                    usedVariables.addAll(concreteOp.getPartitionFields());
-                    for (OrderColumn orderCol : concreteOp.getOrderColumns()) {
+                case HASH_PARTITION_MERGE_EXCHANGE:
+                    HashPartitionMergeExchangePOperator hashMergePOp = (HashPartitionMergeExchangePOperator) physOp;
+                    usedVariables.addAll(hashMergePOp.getPartitionFields());
+                    for (OrderColumn orderCol : hashMergePOp.getOrderColumns()) {
                         usedVariables.add(orderCol.getColumn());
                     }
                     break;
-                }
-                case SORT_MERGE_EXCHANGE: {
-                    SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;
-                    for (OrderColumn orderCol : concreteOp.getSortColumns()) {
+                case SORT_MERGE_EXCHANGE:
+                    SortMergeExchangePOperator sortMergePOp = (SortMergeExchangePOperator) physOp;
+                    for (OrderColumn orderCol : sortMergePOp.getSortColumns()) {
                         usedVariables.add(orderCol.getColumn());
                     }
                     break;
-                }
-                case RANGE_PARTITION_EXCHANGE: {
-                    RangePartitionExchangePOperator concreteOp = (RangePartitionExchangePOperator) physOp;
-                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+                case RANGE_PARTITION_EXCHANGE:
+                    RangePartitionExchangePOperator rangePartitionPOp = (RangePartitionExchangePOperator) physOp;
+                    for (OrderColumn partCol : rangePartitionPOp.getPartitioningFields()) {
                         usedVariables.add(partCol.getColumn());
                     }
                     break;
-                }
-                case RANGE_PARTITION_MERGE_EXCHANGE: {
-                    RangePartitionMergeExchangePOperator concreteOp = (RangePartitionMergeExchangePOperator) physOp;
-                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+                case RANGE_PARTITION_MERGE_EXCHANGE:
+                    RangePartitionMergeExchangePOperator rangeMergePOp = (RangePartitionMergeExchangePOperator) physOp;
+                    for (OrderColumn partCol : rangeMergePOp.getPartitioningFields()) {
                         usedVariables.add(partCol.getColumn());
                     }
                     break;
-                }
-                case RANDOM_PARTITION_EXCHANGE: {
+                case RANDOM_PARTITION_EXCHANGE:
                     break;
-                }
-                default: {
+                default:
                     throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
-                }
             }
         }
         return null;
@@ -439,6 +433,12 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+        op.getRangeMapExpression().getValue().getUsedVariables(usedVariables);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Void arg) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 78e96a4..0c08369 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -29,16 +30,20 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 
@@ -67,16 +72,26 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat
     }
 
     @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
-        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator sortOp,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext ctx) {
+        if (sortOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
             if (orderProp == null) {
-                computeLocalProperties(op);
+                computeLocalProperties(sortOp);
             }
-            StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(
-                    IPartitioningProperty.UNPARTITIONED, Collections.singletonList(orderProp)) };
-            return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            StructuralPropertiesVector[] requiredProp = new StructuralPropertiesVector[1];
+            IPartitioningProperty partitioning;
+            INodeDomain targetNodeDomain = ctx.getComputationNodeDomain();
+            if (isFullParallel((AbstractLogicalOperator) sortOp, targetNodeDomain, ctx)) {
+                // partitioning requirement: input data is re-partitioned on sort columns (global ordering)
+                // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here
+                partitioning = new OrderedPartitionedProperty(Arrays.asList(sortColumns), targetNodeDomain);
+            } else {
+                // partitioning requirement: input data is unpartitioned (i.e. must be merged at one site)
+                partitioning = IPartitioningProperty.UNPARTITIONED;
+            }
+            // local requirement: each partition must be locally ordered
+            requiredProp[0] = new StructuralPropertiesVector(partitioning, Collections.singletonList(orderProp));
+            return new PhysicalRequirements(requiredProp, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
         }
@@ -123,4 +138,27 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat
     public boolean expensiveThanMaterialization() {
         return true;
     }
+
+    /**
+     * When true, the sort operator requires ORDERED_PARTITION (only applicable to dynamic version for now).
+     * Conditions:
+     * 1. Execution mode == partitioned
+     * 2. Dynamic range map was not disabled by some checks
+     * 3. User didn't disable it
+     * 4. User didn't provide static range map
+     * 5. Physical sort operator is not in-memory
+     * 6. There are at least two partitions in the cluster
+     * @param sortOp the sort operator
+     * @param clusterDomain the partitions specification of the cluster
+     * @param ctx optimization context
+     * @return true if the sort operator should be full parallel sort, false otherwise.
+     */
+    private boolean isFullParallel(AbstractLogicalOperator sortOp, INodeDomain clusterDomain,
+            IOptimizationContext ctx) {
+        return sortOp.getAnnotations().get(OperatorAnnotations.USE_DYNAMIC_RANGE) != Boolean.FALSE
+                && !sortOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)
+                && sortOp.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.STABLE_SORT
+                && clusterDomain.cardinality() != null && clusterDomain.cardinality() > 1
+                && ctx.getPhysicalOptimizationConfig().getSortParallel();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
new file mode 100644
index 0000000..11c584e
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor;
+
+/**
+ * <pre>
+ * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}}
+ * idx0: Input data source --
+ *                           |-- forward op.
+ * idx1: RangeMap generator--
+ * </pre>
+ */
+public class ForwardPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.FORWARD;
+    }
+
+    /**
+     * Forward operator requires that the global aggregate operator broadcasts the range map. No required properties at
+     * the data source input.
+     * @param op {@see {@link ForwardOperator}}
+     * @param requiredByParent parent's requirements, which are not enforced for now, as we only explore one plan
+     * @param context the optimization context
+     * @return broadcast requirement at input 1; empty requirements at input 0; No coordination between the two.
+     */
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) {
+        // broadcast the range map to the cluster node domain
+        INodeDomain targetDomain = context.getComputationNodeDomain();
+        List<ILocalStructuralProperty> noProp = new ArrayList<>();
+        StructuralPropertiesVector[] requiredAtInputs = new StructuralPropertiesVector[2];
+        requiredAtInputs[0] = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+        requiredAtInputs[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(targetDomain), noProp);
+        return new PhysicalRequirements(requiredAtInputs, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    /**
+     * Forward operator delivers whatever properties delivered by the input located at index = 0 (tuples source op).
+     * Subtree at index 0 must compute its delivered properties before any call to this method
+     * @param op forward logical operator
+     * @param context {@link IOptimizationContext}
+     * @throws AlgebricksException
+     */
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator dataSourceOperator = op.getInputs().get(0).getValue();
+        deliveredProperties = dataSourceOperator.getDeliveredPhysicalProperties().clone();
+    }
+
+    /**
+     * The output record descriptor of forward operator is the same as the output record descriptor of the data source
+     * which is located at index 0.
+     * @param builder Hyracks job builder
+     * @param context job generation context
+     * @param op {@see {@link ForwardOperator}}
+     * @param propagatedSchema not used
+     * @param inputSchemas schemas of all inputs
+     * @param outerPlanSchema not used
+     * @throws AlgebricksException
+     */
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        ForwardOperator forwardOp = (ForwardOperator) op;
+        RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()), inputSchemas[0], context);
+        ForwardOperatorDescriptor forwardDescriptor =
+                new ForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getRangeMapKey(), dataInputDescriptor);
+        builder.contributeHyracksOperator(forwardOp, forwardDescriptor);
+        ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(dataSource, 0, forwardOp, 0);
+        ILogicalOperator rangemapSource = forwardOp.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(rangemapSource, 0, forwardOp, 1);
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] outputDependencyLabels = new int[] { 1 };
+        int[] inputDependencyLabels = new int[] { 1, 0 };
+        return new Pair<>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index 6630d32..aeb9ac7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -42,27 +42,40 @@ import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirement
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.DynamicFieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 
 public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
 
     private List<OrderColumn> partitioningFields;
     private INodeDomain domain;
-    private IRangeMap rangeMap;
+    private RangeMap rangeMap;
+    private final boolean rangeMapIsComputedAtRunTime;
+    private final String rangeMapKeyInContext;
 
-    public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
-            IRangeMap rangeMap) {
+    private RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, RangeMap rangeMap,
+            boolean rangeMapIsComputedAtRunTime, String rangeMapKeyInContext) {
         this.partitioningFields = partitioningFields;
         this.domain = domain;
         this.rangeMap = rangeMap;
+        this.rangeMapIsComputedAtRunTime = rangeMapIsComputedAtRunTime;
+        this.rangeMapKeyInContext = rangeMapKeyInContext;
+    }
+
+    public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, String rangeMapKeyInContext,
+            INodeDomain domain) {
+        this(partitioningFields, domain, null, true, rangeMapKeyInContext);
+    }
+
+    public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
+            RangeMap rangeMap) {
+        this(partitioningFields, domain, rangeMap, false, "");
     }
 
     @Override
@@ -80,8 +93,7 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        IPartitioningProperty p =
-                new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
+        IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<>(partitioningFields), domain);
         this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
     }
 
@@ -97,32 +109,31 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
         int n = partitioningFields.size();
         int[] sortFields = new int[n];
         IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
-
-        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
-        INormalizedKeyComputerFactory nkcf = null;
-
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
         int i = 0;
         for (OrderColumn oc : partitioningFields) {
             LogicalVariable var = oc.getColumn();
             sortFields[i] = opSchema.findVariable(var);
             Object type = env.getVarType(var);
-            OrderKind order = oc.getOrder();
-            if (i == 0 && nkcfProvider != null && type != null) {
-                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
-            }
             IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
             comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
             i++;
         }
-        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
-        IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
-        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+        FieldRangePartitionComputerFactory partitionerFactory;
+        if (rangeMapIsComputedAtRunTime) {
+            partitionerFactory = new DynamicFieldRangePartitionComputerFactory(sortFields, comps, rangeMapKeyInContext,
+                    op.getSourceLocation());
+        } else {
+            partitionerFactory = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        }
+
+        IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, partitionerFactory);
+        return new Pair<>(conn, null);
     }
 
     @Override
     public String toString() {
-        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+        final String splitCount = rangeMap == null ? "" : " SPLIT COUNT:" + Integer.toString(rangeMap.getSplitCount());
+        return getOperatorTag().toString() + " " + partitioningFields + splitCount;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index ec32a53..b015193 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -53,18 +53,18 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 
 public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator {
 
     private List<OrderColumn> partitioningFields;
     private INodeDomain domain;
-    private IRangeMap rangeMap;
+    private RangeMap rangeMap;
 
     public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
-            IRangeMap rangeMap) {
+            RangeMap rangeMap) {
         this.partitioningFields = partitioningFields;
         this.domain = domain;
         this.rangeMap = rangeMap;
@@ -143,7 +143,7 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
             comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
             i++;
         }
-        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        ITuplePartitionComputerFactory tpcf = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
         IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
new file mode 100644
index 0000000..df0b446
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor;
+
+public class SequentialMergeExchangePOperator extends AbstractExchangePOperator {
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SEQUENTIAL_MERGE_EXCHANGE;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        List<ILocalStructuralProperty> childLocalProps = childOp.getDeliveredPhysicalProperties().getLocalProperties();
+        List<ILocalStructuralProperty> localProperties;
+        if (childLocalProps != null) {
+            localProperties = new ArrayList<>(childLocalProps);
+        } else {
+            localProperties = new ArrayList<>(0);
+        }
+
+        deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProperties);
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, IHyracksJobBuilder.TargetConstraint> createConnectorDescriptor(
+            IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
+            throws AlgebricksException {
+        IConnectorDescriptor connector = new MToOneSequentialMergingConnectorDescriptor(spec);
+        return new Pair<>(connector, IHyracksJobBuilder.TargetConstraint.ONE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 99ed738..77f052e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -465,6 +466,13 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException {
+        addIndent(indent)
+                .append("forward: range-map = " + op.getRangeMapExpression().getValue().accept(exprVisitor, indent));
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("sink");
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index f1f1f3b..4a17cc6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -647,6 +648,14 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
     }
 
     @Override
+    public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException {
+        addIndent(indent).append("\"operator\": \"forward\"");
+        addIndent(indent).append("\"expressions\": \""
+                + op.getRangeMapExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("\"operator\": \"sink\"");
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
index e7e98a5..b006a1e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java
@@ -121,20 +121,7 @@ public final class LocalOrderProperty implements ILocalStructuralProperty {
         Iterator<OrderColumn> currentColumnIterator = orderColumns.iterator();
 
         // Returns true if requiredColumnIterator is a prefix of currentColumnIterator.
-        return isPrefixOf(requiredColumnIterator, currentColumnIterator);
-    }
-
-    private <T> boolean isPrefixOf(Iterator<T> requiredColumnIterator, Iterator<T> currentColumnIterator) {
-        while (requiredColumnIterator.hasNext()) {
-            T oc = requiredColumnIterator.next();
-            if (!currentColumnIterator.hasNext()) {
-                return false;
-            }
-            if (!oc.equals(currentColumnIterator.next())) {
-                return false;
-            }
-        }
-        return true;
+        return PropertiesUtil.isPrefixOf(requiredColumnIterator, currentColumnIterator);
     }
 
     // Gets normalized  ordering columns, where each column variable is a representative variable of its equivalence

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index f2fed13..1c00e45 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -200,7 +200,7 @@ public class PropertiesUtil {
      * @param target
      * @return true iff pref is a prefix of target
      */
-    private static <T> boolean isPrefixOf(Iterator<T> pref, Iterator<T> target) {
+    public static <T> boolean isPrefixOf(Iterator<T> pref, Iterator<T> target) {
         while (pref.hasNext()) {
             T v = pref.next();
             if (!target.hasNext()) {
@@ -213,50 +213,65 @@ public class PropertiesUtil {
         return true;
     }
 
+    /**
+     * Normalizes or reduces the order columns argument based on the functional dependencies argument. The caller is
+     * responsible for taking caution as to how to handle the returned object since this method either returns the same
+     * object that is passed or returns a new object.
+     * @param orderColumns the order columns that are to be normalized
+     * @param functionalDependencies {@link FunctionalDependency}
+     * @return a new normalized object if normalization is applied. Otherwise, the same argument object is returned.
+     */
     public static List<OrderColumn> applyFDsToOrderColumns(List<OrderColumn> orderColumns,
-            List<FunctionalDependency> fds) {
-        // the set of vars. is ordered
-        // so we try the variables in order from last to first
-        if (fds == null || fds.isEmpty()) {
+            List<FunctionalDependency> functionalDependencies) {
+        if (functionalDependencies == null || functionalDependencies.isEmpty()) {
             return orderColumns;
         }
 
+        // the set of vars. is ordered
+        // so we try the variables in order from last to first
         int deleted = 0;
+        boolean[] removedColumns = new boolean[orderColumns.size()];
         for (int i = orderColumns.size() - 1; i >= 0; i--) {
-            for (FunctionalDependency fdep : fds) {
-                if (impliedByPrefix(orderColumns, i, fdep)) {
-                    orderColumns.set(i, null);
+            for (FunctionalDependency functionalDependency : functionalDependencies) {
+                if (impliedByPrefix(orderColumns, i, functionalDependency)) {
+                    removedColumns[i] = true;
                     deleted++;
                     break;
                 }
             }
         }
-        List<OrderColumn> norm = new ArrayList<>(orderColumns.size() - deleted);
-        for (OrderColumn oc : orderColumns) {
-            if (oc != null) {
-                norm.add(oc);
+        List<OrderColumn> normalizedColumns = new ArrayList<>(orderColumns.size() - deleted);
+        for (int i = 0; i < orderColumns.size(); i++) {
+            if (!removedColumns[i]) {
+                normalizedColumns.add(orderColumns.get(i));
             }
         }
-        return norm;
+
+        return normalizedColumns;
     }
 
+    /**
+     * Normalizes or reduces the order columns argument based on the equivalenceClasses argument. The caller is
+     * responsible for taking caution as to how to handle the returned object since this method either returns the same
+     * object that is passed or returns a new object.
+     * @param orderColumns the order columns that are to be normalized
+     * @param equivalenceClasses {@link EquivalenceClass}
+     * @return a new normalized object if normalization is applied. Otherwise, the same argument object is returned.
+     */
     public static List<OrderColumn> replaceOrderColumnsByEqClasses(List<OrderColumn> orderColumns,
             Map<LogicalVariable, EquivalenceClass> equivalenceClasses) {
         if (equivalenceClasses == null || equivalenceClasses.isEmpty()) {
             return orderColumns;
         }
         List<OrderColumn> norm = new ArrayList<>();
-        for (OrderColumn v : orderColumns) {
-            EquivalenceClass ec = equivalenceClasses.get(v.getColumn());
-            if (ec == null) {
-                norm.add(v);
-            } else {
-                if (ec.representativeIsConst()) {
-                    // trivially satisfied, so the var. can be removed
-                } else {
-                    norm.add(new OrderColumn(ec.getVariableRepresentative(), v.getOrder()));
-                }
+        for (OrderColumn orderColumn : orderColumns) {
+            EquivalenceClass columnEQClass = equivalenceClasses.get(orderColumn.getColumn());
+            if (columnEQClass == null) {
+                norm.add(orderColumn);
+            } else if (!columnEQClass.representativeIsConst()) {
+                norm.add(new OrderColumn(columnEQClass.getVariableRepresentative(), orderColumn.getOrder()));
             }
+            // else columnEQClass rep. is constant, i.e. trivially satisfied, so the var. can be removed
         }
         return norm;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index deb98b0..548a29f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -124,4 +125,5 @@ public interface ILogicalOperatorVisitor<R, T> {
 
     public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
 
+    public R visitForwardOperator(ForwardOperator op, T arg) throws AlgebricksException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
index 8779777..15bb54b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -25,4 +25,6 @@ public class AlgebricksConfig {
 
     public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks";
     public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME);
+    public static final int SORT_SAMPLES = 100;
+    public static final boolean SORT_PARALLEL = true;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
index 6fa378b..e3b1868 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPoli
 import org.apache.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
 import org.apache.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor;
 
 public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
     private static final long serialVersionUID = 1L;
@@ -33,7 +34,8 @@ public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignme
     @Override
     public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
             int[] fanouts) {
-        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+        if (c instanceof MToNPartitioningMergingConnectorDescriptor
+                || c instanceof MToOneSequentialMergingConnectorDescriptor) {
             return senderSideMaterializePolicy;
         } else {
             return pipeliningPolicy;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index 8eb9b90..a2a0ca1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -37,29 +37,20 @@ import org.apache.logging.log4j.Level;
 
 public class HeuristicOptimizer {
 
-    public static PhysicalOperatorTag[] hyracksOperators =
-            new PhysicalOperatorTag[] { PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
-                    PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY,
-                    PhysicalOperatorTag.HDFS_READER, PhysicalOperatorTag.HYBRID_HASH_JOIN,
-                    PhysicalOperatorTag.IN_MEMORY_HASH_JOIN, PhysicalOperatorTag.NESTED_LOOP,
-                    PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY, PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY,
-                    PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT, PhysicalOperatorTag.UNION_ALL };
-    public static PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
-
-    public static boolean isHyracksOp(PhysicalOperatorTag opTag) {
-        for (PhysicalOperatorTag t : hyracksOperators) {
-            if (t == opTag) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     private final IOptimizationContext context;
     private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
     private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
     private final ILogicalPlan plan;
 
+    private static final PhysicalOperatorTag[] hyracksOperators = new PhysicalOperatorTag[] {
+            PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
+            PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
+            PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+            PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
+            PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT,
+            PhysicalOperatorTag.UNION_ALL, PhysicalOperatorTag.FORWARD };
+    public static final PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
+
     public HeuristicOptimizer(ILogicalPlan plan,
             List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites,
             List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites,
@@ -70,6 +61,15 @@ public class HeuristicOptimizer {
         this.physicalRewrites = physicalRewrites;
     }
 
+    public static boolean isHyracksOp(PhysicalOperatorTag opTag) {
+        for (PhysicalOperatorTag t : hyracksOperators) {
+            if (t == opTag) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public void optimize() throws AlgebricksException {
         if (plan == null) {
             return;
@@ -129,7 +129,6 @@ public class HeuristicOptimizer {
         if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
             AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting physical optimizations.\n");
         }
-        // PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(plan);
         runOptimizationSets(plan, physicalRewrites);
     }