You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2018/11/01 03:54:40 UTC

[2/5] asterixdb git commit: [ASTERIXDB-2466][FUN] Implement window functions

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 2b5e569..d91a255 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -69,6 +69,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -609,7 +610,32 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
 
     @Override
     public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
-        return true;
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.SINK) {
+            return Boolean.FALSE;
+        }
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.WINDOW) {
+            return Boolean.FALSE;
+        }
+        WindowOperator windowOpArg = (WindowOperator) copyAndSubstituteVar(op, arg);
+        if (!VariableUtilities.varListEqualUnordered(op.getPartitionExpressions(),
+                windowOpArg.getPartitionExpressions())) {
+            return Boolean.FALSE;
+        }
+        if (!compareIOrderAndExpressions(op.getOrderExpressions(), windowOpArg.getOrderExpressions())) {
+            return Boolean.FALSE;
+        }
+        if (!VariableUtilities.varListEqualUnordered(getPairList(op.getVariables(), op.getExpressions()),
+                getPairList(windowOpArg.getVariables(), windowOpArg.getExpressions()))) {
+            return Boolean.FALSE;
+        }
+        return Boolean.TRUE;
     }
 
     private Boolean compareExpressions(List<Mutable<ILogicalExpression>> opExprs,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 742d485..d0aec16 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -68,6 +68,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -96,6 +97,13 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
     }
 
     @Override
+    public Void visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapChildren(op, arg);
+        mapVariablesForAbstractAssign(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         mapVariablesStandard(op, arg);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 0196db6..198ffdc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -611,6 +612,20 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
         return opCopy;
     }
 
+    @Override
+    public ILogicalOperator visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> partitionExprCopy =
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getPartitionExpressions());
+        List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprCopy =
+                deepCopyOrderExpressionReferencePairList(op.getOrderExpressions());
+        List<LogicalVariable> varCopy = deepCopyVariableList(op.getVariables());
+        List<Mutable<ILogicalExpression>> exprCopy =
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions());
+        WindowOperator opCopy = new WindowOperator(partitionExprCopy, orderExprCopy, varCopy, exprCopy);
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
     public LinkedHashMap<LogicalVariable, LogicalVariable> getInputToOutputVariableMapping() {
         return inputVarToOutputVarMapping;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 7d3d676..0aaa529 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -60,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
@@ -205,6 +206,12 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
     }
 
     @Override
+    public Void visitWindowOperator(WindowOperator op, IOptimizationContext context) throws AlgebricksException {
+        visitAssignment(op, context);
+        return null;
+    }
+
+    @Override
     public Void visitScriptOperator(ScriptOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
@@ -367,5 +374,4 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
             throws AlgebricksException {
         return null;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index c6f0c14..e5ca646 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
@@ -406,4 +407,19 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
         return new LeftOuterUnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
                 op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
     }
+
+    @Override
+    public ILogicalOperator visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> newPartitionExprs = new ArrayList<>();
+        deepCopyExpressionRefs(op.getPartitionExpressions(), newPartitionExprs);
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderExprs =
+                deepCopyOrderAndExpression(op.getOrderExpressions());
+
+        ArrayList<LogicalVariable> newList = new ArrayList<>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
+        newList.addAll(op.getVariables());
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+
+        return new WindowOperator(newPartitionExprs, newOrderExprs, newList, newExpressions);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index f36f604..eb90288 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -62,6 +62,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
@@ -289,4 +290,8 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void,
         return null;
     }
 
+    @Override
+    public Void visitWindowOperator(WindowOperator op, IOptimizationContext arg) throws AlgebricksException {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 5d0ef6a..43b7c80 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -91,6 +92,12 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
     }
 
     @Override
+    public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+        producedVariables.addAll(op.getVariables());
+        return null;
+    }
+
+    @Override
     public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
         producedVariables.addAll(op.getVariables());
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 70ccf6d..69b17ed 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -64,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -343,4 +344,9 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
         return null;
     }
 
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+        standardLayout(op);
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index c62f555..99d3488 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -65,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
@@ -85,15 +86,7 @@ public class SubstituteVariableVisitor
     @Override
     public Void visitAggregateOperator(AggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        List<LogicalVariable> variables = op.getVariables();
-        int n = variables.size();
-        for (int i = 0; i < n; i++) {
-            if (variables.get(i).equals(pair.first)) {
-                variables.set(i, pair.second);
-            } else {
-                op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
-            }
-        }
+        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
         substVarTypes(op, pair);
         return null;
     }
@@ -101,15 +94,7 @@ public class SubstituteVariableVisitor
     @Override
     public Void visitAssignOperator(AssignOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        List<LogicalVariable> variables = op.getVariables();
-        int n = variables.size();
-        for (int i = 0; i < n; i++) {
-            if (variables.get(i).equals(pair.first)) {
-                variables.set(i, pair.second);
-            } else {
-                op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
-            }
-        }
+        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
         // Substitute variables stored in ordering property
         if (op.getExplicitOrderingProperty() != null) {
             List<OrderColumn> orderColumns = op.getExplicitOrderingProperty().getOrderColumns();
@@ -134,10 +119,10 @@ public class SubstituteVariableVisitor
                 return null;
             }
         }
-        substVarTypes(op, pair);
         if (op.getSelectCondition() != null) {
             op.getSelectCondition().getValue().substituteVar(pair.first, pair.second);
         }
+        substVarTypes(op, pair);
         return null;
     }
 
@@ -240,15 +225,7 @@ public class SubstituteVariableVisitor
     @Override
     public Void visitRunningAggregateOperator(RunningAggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        List<LogicalVariable> variables = op.getVariables();
-        int n = variables.size();
-        for (int i = 0; i < n; i++) {
-            if (variables.get(i).equals(pair.first)) {
-                variables.set(i, pair.second);
-            } else {
-                op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
-            }
-        }
+        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
         substVarTypes(op, pair);
         return null;
     }
@@ -403,6 +380,18 @@ public class SubstituteVariableVisitor
         }
     }
 
+    private void substAssignVariables(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions,
+            Pair<LogicalVariable, LogicalVariable> pair) {
+        int n = variables.size();
+        for (int i = 0; i < n; i++) {
+            if (variables.get(i).equals(pair.first)) {
+                variables.set(i, pair.second);
+            } else {
+                expressions.get(i).getValue().substituteVar(pair.first, pair.second);
+            }
+        }
+    }
+
     @Override
     public Void visitReplicateOperator(ReplicateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
             throws AlgebricksException {
@@ -510,4 +499,18 @@ public class SubstituteVariableVisitor
         substVarTypes(op, pair);
         return null;
     }
+
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> pe : op.getPartitionExpressions()) {
+            pe.getValue().substituteVar(pair.first, pair.second);
+        }
+        for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+            oe.second.getValue().substituteVar(pair.first, pair.second);
+        }
+        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
+        substVarTypes(op, pair);
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 2c68697..b4bea84 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
@@ -471,4 +472,17 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
         return null;
     }
 
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> exprRef : op.getPartitionExpressions()) {
+            exprRef.getValue().getUsedVariables(usedVariables);
+        }
+        for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+            oe.second.getValue().getUsedVariables(usedVariables);
+        }
+        for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+            exprRef.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index d68be20..09ee358 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -40,7 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.aggrun.RunningAggregateRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class RunningAggregatePOperator extends AbstractPhysicalOperator {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
new file mode 100644
index 0000000..7853524
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.aggrun.WindowRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+public class WindowPOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> partitionColumns;
+
+    private final boolean partitionMaterialization;
+
+    private final List<OrderColumn> orderColumns;
+
+    public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
+            List<OrderColumn> orderColumns) {
+        this.partitionColumns = partitionColumns;
+        this.partitionMaterialization = partitionMaterialization;
+        this.orderColumns = orderColumns;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.WINDOW;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
+        IPartitioningProperty pp;
+        switch (op.getExecutionMode()) {
+            case PARTITIONED:
+                pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
+                        context.getComputationNodeDomain());
+                break;
+            case UNPARTITIONED:
+                pp = IPartitioningProperty.UNPARTITIONED;
+                break;
+            case LOCAL:
+                pp = null;
+                break;
+            default:
+                throw new IllegalStateException(op.getExecutionMode().name());
+        }
+
+        // require local order property [pc1, ... pcN, oc1, ... ocN]
+        // accounting for cases where there's an overlap between order and partition columns
+        // TODO replace with required local grouping on partition columns + local order on order columns
+        List<OrderColumn> lopColumns = new ArrayList<>();
+        ListSet<LogicalVariable> pcVars = new ListSet<>();
+        pcVars.addAll(partitionColumns);
+        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+            OrderColumn oc = orderColumns.get(oIdx);
+            LogicalVariable ocVar = oc.getColumn();
+            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
+                throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
+                        op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns));
+            }
+            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+        }
+        int pIdx = 0;
+        for (LogicalVariable pColumn : pcVars) {
+            lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
+        }
+        List<ILocalStructuralProperty> localProps = Collections.singletonList(new LocalOrderProperty(lopColumns));
+
+        return new PhysicalRequirements(
+                new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        WindowOperator winOp = (WindowOperator) op;
+        int[] outColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables());
+        List<Mutable<ILogicalExpression>> expressions = winOp.getExpressions();
+        IRunningAggregateEvaluatorFactory[] winFuncs = new IRunningAggregateEvaluatorFactory[expressions.size()];
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        for (int i = 0; i < winFuncs.length; i++) {
+            StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) expressions.get(i).getValue();
+            winFuncs[i] = expressionRuntimeProvider.createRunningAggregateFunctionFactory(expr,
+                    context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+        }
+
+        // TODO push projections into the operator
+        int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+
+        int[] partitionColumnList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
+
+        IBinaryComparatorFactory[] partitionComparatorFactories = JobGenHelper
+                .variablesToAscBinaryComparatorFactories(partitionColumns, context.getTypeEnvironment(op), context);
+
+        //TODO not all functions need order comparators
+        IBinaryComparatorFactory[] orderComparatorFactories = JobGenHelper
+                .variablesToBinaryComparatorFactories(orderColumns, context.getTypeEnvironment(op), context);
+
+        WindowRuntimeFactory runtime = new WindowRuntimeFactory(outColumns, winFuncs, projectionList,
+                partitionColumnList, partitionComparatorFactories, partitionMaterialization, orderComparatorFactories);
+        runtime.setSourceLocation(winOp.getSourceLocation());
+
+        // contribute one Asterix framewriter
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+        builder.contributeMicroOperator(winOp, runtime, recDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = winOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, winOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+
+    public boolean isPartitionMaterialization() {
+        return partitionMaterialization;
+    }
+
+    private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
+        for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
+            if (varSet.contains(ocList.get(i).getColumn())) {
+                return true;
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 77f052e..ad45614 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
@@ -170,13 +171,10 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
     @Override
     public Void visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("order ");
-        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
-            if (op.getTopK() != -1) {
-                buffer.append("(topK: " + op.getTopK() + ") ");
-            }
-            String fst = getOrderString(p.first);
-            buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+        if (op.getTopK() != -1) {
+            buffer.append("(topK: " + op.getTopK() + ") ");
         }
+        pprintOrderList(op.getOrderExpressions(), indent);
         return null;
     }
 
@@ -484,6 +482,17 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
         return null;
     }
 
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Integer indent) throws AlgebricksException {
+        addIndent(indent).append("window ").append(str(op.getVariables())).append(" <- ");
+        pprintExprList(op.getExpressions(), indent);
+        buffer.append(" partition ");
+        pprintExprList(op.getPartitionExpressions(), indent);
+        buffer.append(" order ");
+        pprintOrderList(op.getOrderExpressions(), indent);
+        return null;
+    }
+
     protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent) throws AlgebricksException {
         boolean first = true;
         if (op.getNestedPlans().isEmpty()) {
@@ -537,4 +546,12 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
         }
         buffer.append("]");
     }
+
+    protected void pprintOrderList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderList,
+            Integer indent) throws AlgebricksException {
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderList) {
+            String fst = getOrderString(p.first);
+            buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 4a17cc6..4c810ab 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -70,6 +70,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 
@@ -251,30 +252,12 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
     @Override
     public Void visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("\"operator\": \"order\"");
-        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
-            buffer.append(",\n");
-            if (op.getTopK() != -1) {
-                addIndent(indent).append("\"topK\": \"" + op.getTopK() + "\",\n");
-            }
-            String fst = getOrderString(p.first);
-            addIndent(indent).append("\"first\": " + fst + ",\n");
-            addIndent(indent).append(
-                    "\"second\": \"" + p.second.getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
-        }
+        int topK = op.getTopK();
+        List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions = op.getOrderExpressions();
+        pprintOrderExprList(orderExpressions, topK, indent);
         return null;
     }
 
-    private String getOrderString(OrderOperator.IOrder first) {
-        switch (first.getKind()) {
-            case ASC:
-                return "\"ASC\"";
-            case DESC:
-                return "\"DESC\"";
-            default:
-                return first.getExpressionRef().toString();
-        }
-    }
-
     @Override
     public Void visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("\"operator\": \"assign\"");
@@ -667,6 +650,23 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
         return null;
     }
 
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Integer indent) throws AlgebricksException {
+        addIndent(indent).append("\"operator\": \"window\"");
+        variablePrintHelper(op.getVariables(), indent);
+        addIndent(0).append(",\n");
+        pprintExprList(op.getExpressions(), indent);
+        if (!op.getPartitionExpressions().isEmpty()) {
+            buffer.append(",\n");
+            addIndent(indent).append("\"partition by\": ");
+            pprintExprList(op.getPartitionExpressions(), indent);
+        }
+        buffer.append(",\n");
+        addIndent(indent).append("\"order by\": ");
+        pprintOrderExprList(op.getOrderExpressions(), -1, indent);
+        return null;
+    }
+
     protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent) throws AlgebricksException {
         idCounter.nextPrefix();
         buffer.append("[\n");
@@ -718,4 +718,29 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
         }
         buffer.append("]");
     }
+
+    private void pprintOrderExprList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+            int topK, Integer indent) throws AlgebricksException {
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+            buffer.append(",\n");
+            if (topK != -1) {
+                addIndent(indent).append("\"topK\": \"" + topK + "\",\n");
+            }
+            String fst = getOrderString(p.first);
+            addIndent(indent).append("\"first\": " + fst + ",\n");
+            addIndent(indent).append(
+                    "\"second\": \"" + p.second.getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
+        }
+    }
+
+    private String getOrderString(OrderOperator.IOrder first) {
+        switch (first.getKind()) {
+            case ASC:
+                return "\"ASC\"";
+            case DESC:
+                return "\"DESC\"";
+            default:
+                return first.getExpressionRef().toString();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 67199b9..ea914fa 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
@@ -134,6 +135,13 @@ public class OperatorManipulationUtil {
                         forceUnpartitioned = true;
                     }
                 }
+                if (op.getOperatorTag() == LogicalOperatorTag.WINDOW) {
+                    WindowOperator winOp = (WindowOperator) op;
+                    if (winOp.getPartitionExpressions().isEmpty()) {
+                        op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+                        forceUnpartitioned = true;
+                    }
+                }
 
                 for (Mutable<ILogicalOperator> i : op.getInputs()) {
                     boolean exit = false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 548a29f..9d5cdeb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -52,6 +52,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 
@@ -126,4 +127,6 @@ public interface ILogicalOperatorVisitor<R, T> {
     public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
 
     public R visitForwardOperator(ForwardOperator op, T arg) throws AlgebricksException;
+
+    public R visitWindowOperator(WindowOperator op, T arg) throws AlgebricksException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
index b204bcb..5142ce7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -25,6 +25,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
 import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
@@ -144,6 +146,20 @@ public final class JobGenHelper {
         return compFactories;
     }
 
+    public static IBinaryComparatorFactory[] variablesToBinaryComparatorFactories(Collection<OrderColumn> orderColumns,
+            IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+        IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[orderColumns.size()];
+        IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+        int i = 0;
+        for (OrderColumn oc : orderColumns) {
+            LogicalVariable v = oc.getColumn();
+            boolean ascending = oc.getOrder() == OrderOperator.IOrder.OrderKind.ASC;
+            Object type = env.getVarType(v);
+            compFactories[i++] = bcfProvider.getBinaryComparatorFactory(type, ascending);
+        }
+        return compFactories;
+    }
+
     public static INormalizedKeyComputerFactory variablesToAscNormalizedKeyComputerFactory(
             Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
             throws AlgebricksException {
@@ -181,12 +197,20 @@ public final class JobGenHelper {
     }
 
     public static int[] projectAllVariables(IOperatorSchema opSchema) {
-        int[] projectionList = new int[opSchema.getSize()];
+        return projectVariablesImpl(opSchema, opSchema, opSchema.getSize());
+    }
+
+    public static int[] projectVariables(IOperatorSchema opSchema, List<LogicalVariable> variables) {
+        return projectVariablesImpl(opSchema, variables, variables.size());
+    }
+
+    private static int[] projectVariablesImpl(IOperatorSchema opSchema, Iterable<LogicalVariable> variables,
+            int variableCount) {
+        int[] projectionList = new int[variableCount];
         int k = 0;
-        for (LogicalVariable v : opSchema) {
+        for (LogicalVariable v : variables) {
             projectionList[k++] = opSchema.findVariable(v);
         }
         return projectionList;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 113d205..f10c3a4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
@@ -192,17 +193,7 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
                 stringBuilder.append("(topK: ").append(op.getTopK()).append(") ");
             }
             stringBuilder.append("(");
-            switch (p.first.getKind()) {
-                case ASC:
-                    stringBuilder.append("ASC");
-                    break;
-                case DESC:
-                    stringBuilder.append("DESC");
-                    break;
-                default:
-                    final Mutable<ILogicalExpression> expressionRef = p.first.getExpressionRef();
-                    stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString());
-            }
+            appendOrder(p.first);
             stringBuilder.append(", ").append(p.second.getValue().toString()).append(") ");
         }
         appendSchema(op, showDetails);
@@ -211,6 +202,20 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
         return stringBuilder.toString();
     }
 
+    private void appendOrder(OrderOperator.IOrder order) {
+        switch (order.getKind()) {
+            case ASC:
+                stringBuilder.append("ASC");
+                break;
+            case DESC:
+                stringBuilder.append("DESC");
+                break;
+            default:
+                final Mutable<ILogicalExpression> expressionRef = order.getExpressionRef();
+                stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString());
+        }
+    }
+
     @Override
     public String visitAssignOperator(AssignOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
@@ -595,6 +600,26 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
         return stringBuilder.toString();
     }
 
+    @Override
+    public String visitWindowOperator(WindowOperator op, Boolean showDetails) throws AlgebricksException {
+        stringBuilder.setLength(0);
+        stringBuilder.append("window (").append(str(op.getVariables())).append(" <- ");
+        printExprList(op.getExpressions());
+        stringBuilder.append(") partition by (");
+        printExprList(op.getPartitionExpressions());
+        stringBuilder.append(") order by (");
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
+            stringBuilder.append("(");
+            appendOrder(p.first);
+            stringBuilder.append(", ").append(p.second.getValue().toString()).append(") ");
+        }
+        stringBuilder.append(")");
+        appendSchema(op, showDetails);
+        appendAnnotations(op, showDetails);
+        appendPhysicalOperatorInfo(op, showDetails);
+        return stringBuilder.toString();
+    }
+
     private void printExprList(List<Mutable<ILogicalExpression>> expressions) {
         stringBuilder.append("[");
         expressions.forEach(exprRef -> stringBuilder.append(exprRef.getValue().toString()).append(", "));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
index 036b3e1..7adc732 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
@@ -18,26 +18,30 @@
  */
 package org.apache.hyracks.algebricks.rewriter.rules;
 
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public abstract class AbstractExtractExprRule implements IAlgebraicRewriteRule {
 
-    protected LogicalVariable extractExprIntoAssignOpRef(ILogicalExpression gExpr, Mutable<ILogicalOperator> opRef2,
-            IOptimizationContext context) throws AlgebricksException {
+    protected static LogicalVariable extractExprIntoAssignOpRef(ILogicalExpression gExpr,
+            Mutable<ILogicalOperator> opRef2, IOptimizationContext context) throws AlgebricksException {
         LogicalVariable v = context.newVar();
-        AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(gExpr));
+        AssignOperator a = new AssignOperator(v, new MutableObject<>(gExpr));
         a.setSourceLocation(gExpr.getSourceLocation());
-        a.getInputs().add(new MutableObject<ILogicalOperator>(opRef2.getValue()));
+        a.getInputs().add(new MutableObject<>(opRef2.getValue()));
         opRef2.setValue(a);
         if (gExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
             context.addNotToBeInlinedVar(v);
@@ -46,4 +50,37 @@ public abstract class AbstractExtractExprRule implements IAlgebraicRewriteRule {
         return v;
     }
 
+    protected static <T> boolean extractComplexExpressions(ILogicalOperator op, List<T> exprList,
+            Function<T, Mutable<ILogicalExpression>> exprGetter, Predicate<ILogicalExpression> retainPredicate,
+            IOptimizationContext context) throws AlgebricksException {
+        if (!hasComplexExpressions(exprList, exprGetter)) {
+            return false;
+        }
+        boolean rewritten = false;
+        Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(0);
+        for (T item : exprList) {
+            Mutable<ILogicalExpression> exprMutable = exprGetter.apply(item);
+            ILogicalExpression expr = exprMutable.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE && !retainPredicate.test(expr)) {
+                LogicalVariable v = extractExprIntoAssignOpRef(expr, inputOpRef, context);
+                VariableReferenceExpression vRef = new VariableReferenceExpression(v);
+                vRef.setSourceLocation(expr.getSourceLocation());
+                exprMutable.setValue(vRef);
+                rewritten = true;
+            }
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+        return rewritten;
+    }
+
+    private static <T> boolean hasComplexExpressions(List<T> exprList,
+            Function<T, Mutable<ILogicalExpression>> exprGetter) {
+        for (T item : exprList) {
+            Mutable<ILogicalExpression> exprMutable = exprGetter.apply(item);
+            if (exprMutable.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                return true;
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index cdab2f4..52b3f59 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -515,7 +515,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                 case LOCAL_GROUPING_PROPERTY: {
                     LocalGroupingProperty g = (LocalGroupingProperty) prop;
                     Collection<LogicalVariable> vars =
-                            (g.getPreferredOrderEnforcer() != null) ? g.getPreferredOrderEnforcer() : g.getColumnSet();
+                            !g.getPreferredOrderEnforcer().isEmpty() ? g.getPreferredOrderEnforcer() : g.getColumnSet();
                     List<OrderColumn> orderColumns = new ArrayList<>();
                     for (LogicalVariable v : vars) {
                         OrderColumn oc = new OrderColumn(v, OrderKind.ASC);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
index eb2bee6..06b2e16 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
@@ -18,17 +18,17 @@
  */
 package org.apache.hyracks.algebricks.rewriter.rules;
 
-import org.apache.commons.lang3.mutable.Mutable;
+import java.util.List;
+import java.util.function.Function;
 
+import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 
@@ -55,8 +55,8 @@ public class ExtractGbyExpressionsRule extends AbstractExtractExprRule {
         }
         context.addToDontApplySet(this, op1);
         GroupByOperator g = (GroupByOperator) op1;
-        boolean r1 = gbyExprWasRewritten(g, context);
-        boolean r2 = decorExprWasRewritten(g, context);
+        boolean r1 = extractComplexExpressions(g, g.getGroupByList(), context);
+        boolean r2 = extractComplexExpressions(g, g.getDecorList(), context);
         boolean fired = r1 || r2;
         if (fired) {
             context.computeAndSetTypeEnvironmentForOperator(g);
@@ -64,56 +64,15 @@ public class ExtractGbyExpressionsRule extends AbstractExtractExprRule {
         return fired;
     }
 
-    private boolean gbyExprWasRewritten(GroupByOperator g, IOptimizationContext context) throws AlgebricksException {
-        if (!gbyHasComplexExpr(g)) {
-            return false;
-        }
-        Mutable<ILogicalOperator> opRef2 = g.getInputs().get(0);
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getGroupByList()) {
-            ILogicalExpression expr = gbyPair.second.getValue();
-            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context);
-                VariableReferenceExpression vRef = new VariableReferenceExpression(v);
-                vRef.setSourceLocation(expr.getSourceLocation());
-                gbyPair.second.setValue(vRef);
-            }
-        }
-        return true;
-    }
-
-    private boolean decorExprWasRewritten(GroupByOperator g, IOptimizationContext context) throws AlgebricksException {
-        if (!decorHasComplexExpr(g)) {
-            return false;
-        }
-        Mutable<ILogicalOperator> opRef2 = g.getInputs().get(0);
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorPair : g.getDecorList()) {
-            ILogicalExpression expr = decorPair.second.getValue();
-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context);
-                VariableReferenceExpression vRef = new VariableReferenceExpression(v);
-                vRef.setSourceLocation(expr.getSourceLocation());
-                decorPair.second.setValue(vRef);
-            }
-        }
-        return true;
-    }
-
-    private boolean gbyHasComplexExpr(GroupByOperator g) {
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getGroupByList()) {
-            if (gbyPair.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                return true;
-            }
-        }
-        return false;
+    private static boolean extractComplexExpressions(ILogicalOperator op,
+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> exprList, IOptimizationContext context)
+            throws AlgebricksException {
+        return extractComplexExpressions(op, exprList, Pair::getSecond, context);
     }
 
-    private boolean decorHasComplexExpr(GroupByOperator g) {
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getDecorList()) {
-            if (gbyPair.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                return true;
-            }
-        }
-        return false;
+    public static <T> boolean extractComplexExpressions(ILogicalOperator op, List<T> exprList,
+            Function<T, Mutable<ILogicalExpression>> exprGetter, IOptimizationContext context)
+            throws AlgebricksException {
+        return extractComplexExpressions(op, exprList, exprGetter, t -> false, context);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
index aa58985..e6f86be 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.rewriter.rules;
 
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -82,6 +84,10 @@ public class PushNestedOrderByUnderPreSortedGroupByRule implements IAlgebraicRew
         if (!isIndependentFromChildren(order1)) {
             return false;
         }
+        if (OperatorManipulationUtil.ancestorOfOperators(order1.getInputs().get(0).getValue(),
+                EnumSet.of(LogicalOperatorTag.ORDER))) {
+            return false;
+        }
         AbstractPhysicalOperator pOrder1 = (AbstractPhysicalOperator) op2.getPhysicalOperator();
         if (pOrder1.getOperatorTag() != PhysicalOperatorTag.STABLE_SORT
                 && pOrder1.getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 6b09894..1388ccb 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -60,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
@@ -256,6 +257,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
         return visit(op);
     }
 
+    @Override
+    public ILogicalOperator visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
     private ILogicalOperator visit(ILogicalOperator op) throws AlgebricksException {
         List<Map<LogicalVariable, LogicalVariable>> varMapSnapshots = new ArrayList<>();
         for (Mutable<ILogicalOperator> childRef : op.getInputs()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index bcc537a..c8f7cbf 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -73,6 +73,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java
new file mode 100644
index 0000000..b4030f7
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.base;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IWindowAggregateEvaluator extends IRunningAggregateEvaluator {
+    default void configure(IBinaryComparator[] orderComparators) {
+    }
+
+    void initPartition(long partitionLength) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
new file mode 100644
index 0000000..27354cb
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractRunningAggregatePushRuntime<T extends IRunningAggregateEvaluator>
+        extends AbstractOneInputOneOutputOneFramePushRuntime {
+    protected final IHyracksTaskContext ctx;
+    private final IRunningAggregateEvaluatorFactory[] aggFactories;
+    private final Class<T> aggEvalClass;
+    protected final List<T> aggEvals;
+    private final int[] projectionList;
+    private final int[] projectionToOutColumns;
+    private final IPointable p = VoidPointable.FACTORY.createPointable();
+    private final ArrayTupleBuilder tupleBuilder;
+    private boolean first;
+
+    public AbstractRunningAggregatePushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories,
+            int[] projectionList, IHyracksTaskContext ctx, Class<T> aggEvalClass) {
+        this.ctx = ctx;
+        this.projectionList = projectionList;
+        this.aggFactories = aggFactories;
+        this.aggEvalClass = aggEvalClass;
+        aggEvals = new ArrayList<>(aggFactories.length);
+        tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+        projectionToOutColumns = new int[projectionList.length];
+
+        for (int j = 0; j < projectionList.length; j++) {
+            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+        }
+        first = true;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        if (first) {
+            first = false;
+            init();
+        }
+        for (T aggEval : aggEvals) {
+            aggEval.init();
+        }
+    }
+
+    protected void init() throws HyracksDataException {
+        initAccessAppendRef(ctx);
+        for (IRunningAggregateEvaluatorFactory aggFactory : aggFactories) {
+            IRunningAggregateEvaluator aggEval = aggFactory.createRunningAggregateEvaluator(ctx);
+            aggEvals.add(aggEvalClass.cast(aggEval));
+        }
+    }
+
+    protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx) throws HyracksDataException {
+        for (int t = beginIdx; t <= endIdx; t++) {
+            tRef.reset(accessor, t);
+            produceTuple(tupleBuilder, accessor, t, tRef);
+            appendToFrameFromTupleBuilder(tupleBuilder);
+        }
+    }
+
+    private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+            FrameTupleReference tupleRef) throws HyracksDataException {
+        tb.reset();
+        for (int f = 0; f < projectionList.length; f++) {
+            int k = projectionToOutColumns[f];
+            if (k >= 0) {
+                aggEvals.get(k).step(tupleRef, p);
+                tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+            } else {
+                tb.addField(accessor, tIndex, projectionList[f]);
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
+}
\ No newline at end of file