You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2018/11/01 03:54:41 UTC

[3/5] asterixdb git commit: [ASTERIXDB-2466][FUN] Implement window functions

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
index b754533..3fb3507 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
@@ -62,6 +62,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.lang.sqlpp.visitor.base.AbstractSqlppQueryExpressionVisitor;
@@ -424,6 +425,16 @@ public class FreeVariableVisitor extends AbstractSqlppQueryExpressionVisitor<Voi
         return null;
     }
 
+    @Override
+    public Void visit(WindowExpression winExpr, Collection<VariableExpr> freeVars) throws CompilationException {
+        winExpr.getExpr().accept(this, freeVars);
+        if (winExpr.hasPartitionList()) {
+            visit(winExpr.getPartitionList(), freeVars);
+        }
+        visit(winExpr.getOrderbyList(), freeVars);
+        return null;
+    }
+
     private void visitLetClauses(List<LetClause> letClauses, Collection<VariableExpr> freeVars)
             throws CompilationException {
         if (letClauses == null || letClauses.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
index 0973bec..e4d2a27 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.clause.OrderbyClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.GbyVariableExpressionPair;
 import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -47,6 +48,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
 import org.apache.asterix.lang.sqlpp.util.FunctionMapUtil;
 import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
@@ -331,4 +333,28 @@ public class SqlppAstPrintVisitor extends QueryPrintVisitor implements ISqlppVis
         return null;
     }
 
+    @Override
+    public Void visit(WindowExpression winExpr, Integer step) throws CompilationException {
+        out.print(skip(step) + "WINDOW");
+        winExpr.getExpr().accept(this, step + 1);
+        out.println();
+        out.println(skip(step) + "OVER (");
+        if (winExpr.hasPartitionList()) {
+            out.println(skip(step + 1) + "PARTITION BY");
+            List<Expression> partitionList = winExpr.getPartitionList();
+            for (Expression expr : partitionList) {
+                expr.accept(this, step + 2);
+                out.println();
+            }
+        }
+        out.println(skip(step + 1) + "ORDER BY");
+        List<Expression> orderbyList = winExpr.getOrderbyList();
+        List<OrderbyClause.OrderModifier> orderbyModifierList = winExpr.getOrderbyModifierList();
+        for (int i = 0, ln = orderbyList.size(); i < ln; i++) {
+            orderbyList.get(i).accept(this, step + 2);
+            out.println(" " + orderbyModifierList.get(i));
+        }
+        out.println(skip(step) + ")");
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
index e00a3bd..0c70572 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
@@ -50,6 +50,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
 import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
@@ -411,4 +412,20 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV
         newCaseExpr.setSourceLocation(caseExpr.getSourceLocation());
         return new Pair<>(newCaseExpr, env);
     }
+
+    @Override
+    public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(WindowExpression winExpr,
+            VariableSubstitutionEnvironment env) throws CompilationException {
+        Expression newExpr = (Expression) winExpr.getExpr().accept(this, env).first;
+        List<Expression> newPartitionList = winExpr.hasPartitionList()
+                ? VariableCloneAndSubstitutionUtil.visitAndCloneExprList(winExpr.getPartitionList(), env, this) : null;
+        List<Expression> newOrderbyList =
+                VariableCloneAndSubstitutionUtil.visitAndCloneExprList(winExpr.getOrderbyList(), env, this);
+        List<OrderbyClause.OrderModifier> newOrderbyModifierList = new ArrayList<>(winExpr.getOrderbyModifierList());
+        WindowExpression newWinExpr =
+                new WindowExpression(newExpr, newPartitionList, newOrderbyList, newOrderbyModifierList);
+        newWinExpr.setSourceLocation(winExpr.getSourceLocation());
+        newWinExpr.addHints(winExpr.getHints());
+        return new Pair<>(newWinExpr, env);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
index 755cc69..99368f8 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
@@ -44,6 +44,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
@@ -330,4 +331,25 @@ public class SqlppFormatPrintVisitor extends FormatPrintVisitor implements ISqlp
         out.print(SqlppVariableUtil.toUserDefinedName(v.getVar().getValue()));
         return null;
     }
+
+    @Override
+    public Void visit(WindowExpression windowExpr, Integer step) throws CompilationException {
+        out.print(skip(step) + "window ");
+        windowExpr.getExpr().accept(this, step + 2);
+        out.print(skip(step) + " over (");
+        if (windowExpr.hasPartitionList()) {
+            List<Expression> partitionList = windowExpr.getPartitionList();
+            for (int i = 0, ln = partitionList.size(); i < ln; i++) {
+                if (i > 0) {
+                    out.print(COMMA);
+                }
+                Expression partExpr = partitionList.get(i);
+                partExpr.accept(this, step + 2);
+            }
+        }
+        out.print(" order by ");
+        printDelimitedObyExpressions(windowExpr.getOrderbyList(), windowExpr.getOrderbyModifierList(), step + 2);
+        out.println(skip(step) + ")");
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
index 5396768..92ebd60 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
@@ -34,6 +34,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 
 /**
  * A dummy abstract visitor to allow an implementation to only fill in necessary stuff.
@@ -110,4 +111,8 @@ public abstract class AbstractSqlppAstVisitor<R, T> extends AbstractAstVisitor<R
         return null;
     }
 
+    @Override
+    public R visit(WindowExpression winExpr, T arg) throws CompilationException {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
index 4d35914..a3bb592 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
@@ -61,6 +61,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
 
 public class AbstractSqlppSimpleExpressionVisitor
@@ -321,6 +322,16 @@ public class AbstractSqlppSimpleExpressionVisitor
     }
 
     @Override
+    public Expression visit(WindowExpression winExpr, ILangExpression arg) throws CompilationException {
+        winExpr.setExpr(visit(winExpr.getExpr(), arg));
+        if (winExpr.hasPartitionList()) {
+            winExpr.setPartitionList(visit(winExpr.getPartitionList(), winExpr));
+        }
+        winExpr.setOrderbyList(visit(winExpr.getOrderbyList(), winExpr));
+        return winExpr;
+    }
+
+    @Override
     public Expression visit(FieldAccessor fa, ILangExpression arg) throws CompilationException {
         fa.setExpr(visit(fa.getExpr(), fa));
         return fa;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
index 02d9142..13addaf 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
@@ -34,6 +34,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 
 public interface ISqlppVisitor<R, T> extends ILangVisitor<R, T> {
 
@@ -64,4 +65,6 @@ public interface ISqlppVisitor<R, T> extends ILangVisitor<R, T> {
     R visit(HavingClause havingClause, T arg) throws CompilationException;
 
     R visit(CaseExpression caseExpression, T arg) throws CompilationException;
+
+    R visit(WindowExpression windowExpression, T arg) throws CompilationException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 13d1f8d..e19ee7a 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -166,6 +166,7 @@ import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
 import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
 import org.apache.asterix.lang.sqlpp.optype.JoinType;
 import org.apache.asterix.lang.sqlpp.optype.SetOpType;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
@@ -2588,7 +2589,7 @@ FieldBinding FieldBinding() throws ParseException:
 
 Expression FunctionCallExpr() throws ParseException:
 {
-  CallExpr callExpr;
+  Expression resultExpr;
   List<Expression> argList = new ArrayList<Expression>();
   Expression tmp = null;
   int arity = 0;
@@ -2596,6 +2597,10 @@ Expression FunctionCallExpr() throws ParseException:
   String hint = null;
   boolean star = false;
   boolean distinct = false;
+  Token overToken = null;
+  Expression partitionExpr = null;
+  List<Expression> partitionExprs = new ArrayList<Expression>();
+  OrderbyClause orderByClause = null;
 }
 {
   funcName = FunctionName()
@@ -2634,7 +2639,7 @@ Expression FunctionCallExpr() throws ParseException:
       if (signature == null) {
         signature = new FunctionSignature(funcName.dataverse, fqFunctionName, arity);
       }
-      callExpr = FunctionMapUtil.normalizedListInputFunctions(new CallExpr(signature,argList));
+      CallExpr callExpr = FunctionMapUtil.normalizedListInputFunctions(new CallExpr(signature,argList));
       if (hint != null) {
         if (hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) {
           callExpr.addHint(IndexedNLJoinExpressionAnnotation.INSTANCE);
@@ -2643,8 +2648,29 @@ Expression FunctionCallExpr() throws ParseException:
         }
       }
       callExpr.setSourceLocation(funcName.sourceLoc);
-      return callExpr;
+      resultExpr = callExpr;
     }
+
+  (
+    <OVER> { overToken = token; }
+    <LEFTPAREN>
+    (
+      <PARTITION> <BY>
+      partitionExpr = Expression() { partitionExprs.add(partitionExpr); }
+      ( <COMMA> partitionExpr = Expression() { partitionExprs.add(partitionExpr); } )*
+    )?
+    orderByClause = OrderbyClause()
+    <RIGHTPAREN>
+    {
+      WindowExpression winExp = new WindowExpression(callExpr, partitionExprs, orderByClause.getOrderbyList(),
+        orderByClause.getModifierList());
+      resultExpr = addSourceLocation(winExp, overToken);
+    }
+  )?
+
+  {
+     return resultExpr;
+  }
 }
 
 Expression ParenthesizedExpression() throws ParseException:
@@ -3393,7 +3419,9 @@ TOKEN [IGNORE_CASE]:
   | <ORDER : "order">
   | <OUTER : "outer">
   | <OUTPUT : "output">
+  | <OVER: "over">
   | <PATH : "path">
+  | <PARTITION : "partition">
   | <POLICY : "policy">
   | <PRESORTED : "pre-sorted">
   | <PRIMARY : "primary">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 3fb03a1..ccb124a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -132,10 +132,13 @@ import org.apache.asterix.om.typecomputer.impl.UnorderedListConstructorTypeCompu
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
 
 public class BuiltinFunctions {
 
@@ -162,6 +165,9 @@ public class BuiltinFunctions {
     private static final Map<IFunctionInfo, IFunctionInfo> scalarToAggregateFunctionMap = new HashMap<>();
     private static final Map<IFunctionInfo, IFunctionInfo> distinctToRegularScalarAggregateFunctionMap =
             new HashMap<>();
+    private static final Map<IFunctionInfo, IFunctionInfo> builtinWindowFunctions = new HashMap<>();
+    private static final Set<IFunctionInfo> builtinWindowFunctionsWithOrderArgs = new HashSet<>();
+    private static final Set<IFunctionInfo> builtinWindowFunctionsWithMaterialization = new HashSet<>();
 
     private static final Map<IFunctionInfo, SpatialFilterKind> spatialFilterFunctions = new HashMap<>();
 
@@ -809,6 +815,26 @@ public class BuiltinFunctions {
     public static final FunctionIdentifier SCALAR_SQL_VAR_POP_DISTINCT =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-var_pop-distinct", 1);
 
+    // window functions
+    public static final FunctionIdentifier ROW_NUMBER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "row-number", 0);
+    public static final FunctionIdentifier ROW_NUMBER_IMPL =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "row-number-impl", 0);
+    public static final FunctionIdentifier RANK = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "rank", 0);
+    public static final FunctionIdentifier RANK_IMPL =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "rank-impl", FunctionIdentifier.VARARGS);
+    public static final FunctionIdentifier DENSE_RANK =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dense-rank", 0);
+    public static final FunctionIdentifier DENSE_RANK_IMPL =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dense-rank-impl", FunctionIdentifier.VARARGS);
+    public static final FunctionIdentifier PERCENT_RANK =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "percent-rank", 0);
+    public static final FunctionIdentifier PERCENT_RANK_IMPL =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "percent-rank-impl", FunctionIdentifier.VARARGS);
+    public static final FunctionIdentifier NTILE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "ntile", 1);
+    public static final FunctionIdentifier NTILE_IMPL =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "ntile-impl", FunctionIdentifier.VARARGS);
+
     // unnesting functions
     public static final FunctionIdentifier SCAN_COLLECTION =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "scan-collection", 1);
@@ -1727,6 +1753,19 @@ public class BuiltinFunctions {
         addFunction(SQL_VAR_POP_DISTINCT, NullableDoubleTypeComputer.INSTANCE, true);
         addFunction(SCALAR_SQL_VAR_POP_DISTINCT, NullableDoubleTypeComputer.INSTANCE, true);
 
+        // Window functions
+
+        addFunction(ROW_NUMBER, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(ROW_NUMBER_IMPL, AInt64TypeComputer.INSTANCE, true);
+        addFunction(RANK, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(RANK_IMPL, AInt64TypeComputer.INSTANCE, true);
+        addFunction(DENSE_RANK, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(DENSE_RANK_IMPL, AInt64TypeComputer.INSTANCE, true);
+        addFunction(PERCENT_RANK, ADoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(PERCENT_RANK_IMPL, ADoubleTypeComputer.INSTANCE, true);
+        addFunction(NTILE, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(NTILE_IMPL, AInt64TypeComputer.INSTANCE, true);
+
         // Similarity functions
         addFunction(EDIT_DISTANCE_CONTAINS, OrderedListOfAnyTypeComputer.INSTANCE, true);
         addFunction(SIMILARITY_JACCARD, AFloatTypeComputer.INSTANCE, true);
@@ -2487,6 +2526,15 @@ public class BuiltinFunctions {
     }
 
     static {
+        // Window functions
+        addWindowFunction(ROW_NUMBER, ROW_NUMBER_IMPL, false, false);
+        addWindowFunction(RANK, RANK_IMPL, true, false);
+        addWindowFunction(DENSE_RANK, DENSE_RANK_IMPL, true, false);
+        addWindowFunction(PERCENT_RANK, PERCENT_RANK_IMPL, true, true);
+        addWindowFunction(NTILE, NTILE_IMPL, false, true);
+    }
+
+    static {
         addUnnestFun(RANGE, true);
         addUnnestFun(SCAN_COLLECTION, false);
         addUnnestFun(SUBSET_COLLECTION, false);
@@ -2667,6 +2715,40 @@ public class BuiltinFunctions {
                 getAsterixFunctionInfo(regularscalarfi));
     }
 
+    public static void addWindowFunction(FunctionIdentifier fi, FunctionIdentifier implfi, boolean requiresOrderArgs,
+            boolean requiresMaterialization) {
+        IFunctionInfo implFinfo = getAsterixFunctionInfo(implfi);
+        builtinWindowFunctions.put(getAsterixFunctionInfo(fi), implFinfo);
+        if (requiresOrderArgs) {
+            builtinWindowFunctionsWithOrderArgs.add(implFinfo);
+        }
+        if (requiresMaterialization) {
+            builtinWindowFunctionsWithMaterialization.add(implFinfo);
+        }
+    }
+
+    public static boolean isBuiltinWindowFunction(FunctionIdentifier fi) {
+        return builtinWindowFunctions.containsKey(getAsterixFunctionInfo(fi));
+    }
+
+    public static boolean windowFunctionRequiresOrderArgs(FunctionIdentifier implfi) {
+        return builtinWindowFunctionsWithOrderArgs.contains(getAsterixFunctionInfo(implfi));
+    }
+
+    public static boolean windowFunctionRequiresMaterialization(FunctionIdentifier implfi) {
+        return builtinWindowFunctionsWithMaterialization.contains(getAsterixFunctionInfo(implfi));
+    }
+
+    public static AbstractFunctionCallExpression makeWindowFunctionExpression(FunctionIdentifier scalarfi,
+            List<Mutable<ILogicalExpression>> args) {
+        IFunctionInfo finfo = getAsterixFunctionInfo(scalarfi);
+        IFunctionInfo implFinfo = builtinWindowFunctions.get(finfo);
+        if (implFinfo == null) {
+            throw new IllegalStateException("no implementation for window function " + finfo);
+        }
+        return new StatefulFunctionCallExpression(implFinfo, UnpartitionedPropertyComputer.INSTANCE, args);
+    }
+
     static {
         spatialFilterFunctions.put(getAsterixFunctionInfo(BuiltinFunctions.SPATIAL_INTERSECT), SpatialFilterKind.SI);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 19c33db..1b0c7b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -439,6 +439,11 @@ import org.apache.asterix.runtime.evaluators.functions.temporal.UnixTimeFromDate
 import org.apache.asterix.runtime.evaluators.functions.temporal.UnixTimeFromTimeInMsDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.temporal.YearMonthDurationGreaterThanComparatorDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.temporal.YearMonthDurationLessThanComparatorDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.DenseRankRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.NtileRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.PercentRankRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.RankRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.RowNumberRunningAggregateDescriptor;
 import org.apache.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
 import org.apache.asterix.runtime.unnestingfunctions.std.RangeDescriptor;
 import org.apache.asterix.runtime.unnestingfunctions.std.ScanCollectionDescriptor;
@@ -632,6 +637,13 @@ public final class FunctionCollection implements IFunctionCollection {
         fc.add(ScalarSqlVarAggregateDescriptor.FACTORY);
         fc.add(ScalarSqlVarPopAggregateDescriptor.FACTORY);
 
+        // window functions
+        fc.add(RowNumberRunningAggregateDescriptor.FACTORY);
+        fc.add(RankRunningAggregateDescriptor.FACTORY);
+        fc.add(DenseRankRunningAggregateDescriptor.FACTORY);
+        fc.add(PercentRankRunningAggregateDescriptor.FACTORY);
+        fc.add(NtileRunningAggregateDescriptor.FACTORY);
+
         // boolean functions
         fc.add(AndDescriptor.FACTORY);
         fc.add(OrDescriptor.FACTORY);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java
new file mode 100644
index 0000000..54bcc1a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import java.io.DataOutput;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Base evaluator implementation for ranking window functions:
+ * {@code rank()}, {@code dense_rank()}, {@code percent_rank()}
+ */
+public abstract class AbstractRankRunningAggregateEvaluator implements IWindowAggregateEvaluator {
+
+    private final IScalarEvaluator[] args;
+
+    private final ArrayBackedValueStorage[] argPrevValues;
+
+    private final IPointable[] argCurrValues;
+
+    private final boolean dense;
+
+    protected final SourceLocation sourceLoc;
+
+    private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+    private IBinaryComparator[] argComparators;
+
+    private boolean first;
+
+    private long rank;
+
+    private long groupSize;
+
+    AbstractRankRunningAggregateEvaluator(IScalarEvaluator[] args, boolean dense, SourceLocation sourceLoc) {
+        this.args = args;
+        this.dense = dense;
+        this.sourceLoc = sourceLoc;
+        argPrevValues = new ArrayBackedValueStorage[args.length];
+        argCurrValues = new IPointable[args.length];
+        for (int i = 0; i < args.length; i++) {
+            argPrevValues[i] = new ArrayBackedValueStorage();
+            argCurrValues[i] = VoidPointable.FACTORY.createPointable();
+        }
+    }
+
+    @Override
+    public void configure(IBinaryComparator[] orderComparators) {
+        argComparators = orderComparators;
+    }
+
+    @Override
+    public void init() throws HyracksDataException {
+    }
+
+    @Override
+    public void initPartition(long partitionLength) {
+        first = true;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+        for (int i = 0; i < args.length; i++) {
+            args[i].evaluate(tuple, argCurrValues[i]);
+        }
+
+        computeRank();
+
+        for (int i = 0; i < args.length; i++) {
+            argPrevValues[i].assign(argCurrValues[i]);
+        }
+
+        resultStorage.reset();
+        writeResult(rank, resultStorage.getDataOutput());
+        result.set(resultStorage);
+    }
+
+    protected abstract void writeResult(long rank, DataOutput out) throws HyracksDataException;
+
+    private void computeRank() throws HyracksDataException {
+        if (first) {
+            rank = 1;
+            groupSize = 1;
+            first = false;
+        } else if (sameGroup()) {
+            groupSize++;
+        } else {
+            rank += dense ? 1 : groupSize;
+            groupSize = 1;
+        }
+    }
+
+    private boolean sameGroup() throws HyracksDataException {
+        for (int i = 0; i < args.length; i++) {
+            IPointable v1 = argPrevValues[i];
+            IPointable v2 = argCurrValues[i];
+            IBinaryComparator cmp = argComparators[i];
+            if (cmp.compare(v1.getByteArray(), v1.getStartOffset(), v1.getLength(), v2.getByteArray(),
+                    v2.getStartOffset(), v2.getLength()) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java
new file mode 100644
index 0000000..6e51559
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code dense_rank()} window function
+ */
+public class DenseRankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = DenseRankRunningAggregateDescriptor::new;
+
+    @Override
+    public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IRunningAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                IScalarEvaluator[] evals = new IScalarEvaluator[args.length];
+                for (int i = 0; i < args.length; i++) {
+                    evals[i] = args[i].createScalarEvaluator(ctx);
+                }
+                return new RankRunningAggregateEvaluator(evals, true, sourceLoc);
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.DENSE_RANK_IMPL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java
new file mode 100644
index 0000000..5157451
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code ntile()} window function
+ */
+public class NtileRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = NtileRunningAggregateDescriptor::new;
+
+    @Override
+    public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IRunningAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                return new NtileRunningAggregateEvaluator(args[0].createScalarEvaluator(ctx), getIdentifier());
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.NTILE_IMPL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java
new file mode 100644
index 0000000..9bd306e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Evaluator {@code ntile()} window function
+ */
+public class NtileRunningAggregateEvaluator implements IWindowAggregateEvaluator {
+
+    private final IScalarEvaluator evalNumGroups;
+
+    private final VoidPointable argNumGroups = VoidPointable.FACTORY.createPointable();
+
+    private final FunctionIdentifier funId;
+
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt64> serde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+
+    private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+    private final AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private long partitionLength;
+
+    private long groupSize;
+
+    private long groupRemainder;
+
+    private long resultValue;
+
+    private long count;
+
+    NtileRunningAggregateEvaluator(IScalarEvaluator evalNumGroups, FunctionIdentifier funId) {
+        this.evalNumGroups = evalNumGroups;
+        this.funId = funId;
+    }
+
+    @Override
+    public void init() throws HyracksDataException {
+    }
+
+    @Override
+    public void initPartition(long partitionLength) {
+        this.partitionLength = partitionLength;
+        resultValue = 0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+        if (resultValue == 0) {
+            evaluateGroupSize(tuple);
+            resultValue = count = 1;
+        } else if (count < groupSize) {
+            count++;
+        } else if (count == groupSize && groupRemainder > 0) {
+            groupRemainder--;
+            count++;
+        } else {
+            resultValue++;
+            count = 1;
+        }
+
+        resultStorage.reset();
+        aInt64.setValue(resultValue);
+        serde.serialize(aInt64, resultStorage.getDataOutput());
+        result.set(resultStorage);
+    }
+
+    private void evaluateGroupSize(IFrameTupleReference tuple) throws HyracksDataException {
+        evalNumGroups.evaluate(tuple, argNumGroups);
+        byte[] bytes = argNumGroups.getByteArray();
+        int offset = argNumGroups.getStartOffset();
+        long numGroups = ATypeHierarchy.getLongValue(funId.getName(), 0, bytes, offset);
+        if (numGroups > partitionLength || numGroups <= 0) {
+            groupSize = partitionLength;
+            groupRemainder = 0;
+        } else {
+            groupSize = partitionLength / numGroups;
+            groupRemainder = partitionLength % numGroups;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java
new file mode 100644
index 0000000..180ca99
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code percent_rank()} window function
+ */
+public class PercentRankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = PercentRankRunningAggregateDescriptor::new;
+
+    @Override
+    public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IRunningAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                IScalarEvaluator[] evals = new IScalarEvaluator[args.length];
+                for (int i = 0; i < args.length; i++) {
+                    evals[i] = args[i].createScalarEvaluator(ctx);
+                }
+                return new PercentRankRunningAggregateEvaluator(evals, sourceLoc);
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.PERCENT_RANK_IMPL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java
new file mode 100644
index 0000000..c73d9fd
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * Evaluator {@code percent_rank()} window function
+ */
+class PercentRankRunningAggregateEvaluator extends AbstractRankRunningAggregateEvaluator {
+
+    private final AMutableDouble aDouble = new AMutableDouble(0);
+
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADouble> serde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+
+    private double divisor;
+
+    PercentRankRunningAggregateEvaluator(IScalarEvaluator[] args, SourceLocation sourceLoc) {
+        super(args, false, sourceLoc);
+    }
+
+    @Override
+    public void initPartition(long partitionLength) {
+        super.initPartition(partitionLength);
+        divisor = (double) partitionLength - 1;
+    }
+
+    @Override
+    protected void writeResult(long rank, DataOutput out) throws HyracksDataException {
+        double percentRank = (rank - 1) / divisor;
+        aDouble.setValue(percentRank);
+        serde.serialize(aDouble, out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java
new file mode 100644
index 0000000..d5db134
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code rank()} window function
+ */
+public class RankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = RankRunningAggregateDescriptor::new;
+
+    @Override
+    public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IRunningAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                IScalarEvaluator[] evals = new IScalarEvaluator[args.length];
+                for (int i = 0; i < args.length; i++) {
+                    evals[i] = args[i].createScalarEvaluator(ctx);
+                }
+                return new RankRunningAggregateEvaluator(evals, false, sourceLoc);
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.RANK_IMPL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java
new file mode 100644
index 0000000..56ab299
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * Evaluator for {@code rank()} and {@code dense_rank()} window functions
+ */
+class RankRunningAggregateEvaluator extends AbstractRankRunningAggregateEvaluator {
+
+    private final AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt64> serde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+
+    RankRunningAggregateEvaluator(IScalarEvaluator[] args, boolean dense, SourceLocation sourceLoc) {
+        super(args, dense, sourceLoc);
+    }
+
+    protected void writeResult(long rank, DataOutput out) throws HyracksDataException {
+        aInt64.setValue(rank);
+        serde.serialize(aInt64, out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java
new file mode 100644
index 0000000..7464751
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Descriptor {@code row_number()} window function
+ */
+public class RowNumberRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = RowNumberRunningAggregateDescriptor::new;
+
+    @Override
+    public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new IRunningAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx) {
+                return new RowNumberRunningAggregateEvaluator();
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.ROW_NUMBER_IMPL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java
new file mode 100644
index 0000000..75fface
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Evaluator for {@code row_number()} window function
+ */
+class RowNumberRunningAggregateEvaluator implements IWindowAggregateEvaluator {
+
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<AInt64> serde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+
+    private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+    private final AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private long resultValue;
+
+    @Override
+    public void init() {
+        // nothing to do
+    }
+
+    @Override
+    public void initPartition(long partitionLength) {
+        resultValue = 0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+        resultValue++;
+
+        resultStorage.reset();
+        aInt64.setValue(resultValue);
+        serde.serialize(aInt64, resultStorage.getDataOutput());
+        result.set(resultStorage);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
index 2dd71cd..a0d5880 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
@@ -33,6 +33,22 @@ public class Pair<T1, T2> implements Serializable {
         this.second = second;
     }
 
+    public void setFirst(T1 value) {
+        first = value;
+    }
+
+    public T1 getFirst() {
+        return first;
+    }
+
+    public void setSecond(T2 value) {
+        second = value;
+    }
+
+    public T2 getSecond() {
+        return second;
+    }
+
     @Override
     public String toString() {
         return first + "," + second;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 6bd0d02..d996caf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -89,7 +89,7 @@ public interface ILogicalOperator {
      */
 
     public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties,
-            IOptimizationContext context);
+            IOptimizationContext context) throws AlgebricksException;
 
     /**
      * @return the physical properties that this operator delivers, based on

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index 2a92aba..a88ec64 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -40,7 +40,7 @@ public interface IPhysicalOperator {
      * @return for each child, one vector of required physical properties
      */
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context);
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException;
 
     /**
      * @return the physical properties that this operator delivers, based on

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 3794328..4466408 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -56,4 +56,5 @@ public enum LogicalOperatorTag {
     WRITE,
     WRITE_RESULT,
     INTERSECT,
+    WINDOW
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index ac1de5a..5d19134 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -79,4 +79,5 @@ public enum PhysicalOperatorTag {
     UPDATE,
     WRITE_RESULT,
     INTERSECT,
+    WINDOW
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
index 51040fd..e15ed92 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
 /**
@@ -71,4 +72,17 @@ public abstract class AbstractAssignOperator extends AbstractLogicalOperator {
         return modif;
     }
 
+    protected VariablePropagationPolicy createVariablePropagationPolicy(boolean propagateInputVars) {
+        return new VariablePropagationPolicy() {
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) {
+                if (propagateInputVars) {
+                    target.addAllVariables(sources[0]);
+                }
+                for (LogicalVariable v : variables) {
+                    target.addVariable(v);
+                }
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 1dbf15e..d4a9d37 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -115,7 +115,7 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator {
 
     @Override
     public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren(
-            IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) {
+            IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) throws AlgebricksException {
         return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties, context);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index b4a59a8..35cb087 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -57,16 +57,7 @@ public class AggregateOperator extends AbstractAssignOperator {
 
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return new VariablePropagationPolicy() {
-
-            @Override
-            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
-                    throws AlgebricksException {
-                for (LogicalVariable v : variables) {
-                    target.addVariable(v);
-                }
-            }
-        };
+        return createVariablePropagationPolicy(false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index 861d74c..202c291 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -65,18 +65,7 @@ public class AssignOperator extends AbstractAssignOperator {
 
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return new VariablePropagationPolicy() {
-
-            @Override
-            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
-                    throws AlgebricksException {
-                target.addAllVariables(sources[0]);
-                for (LogicalVariable v : variables) {
-                    target.addVariable(v);
-                }
-            }
-        };
-
+        return createVariablePropagationPolicy(true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
index ef16613..8a710cc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
@@ -49,17 +49,7 @@ public class RunningAggregateOperator extends AbstractAssignOperator {
 
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return new VariablePropagationPolicy() {
-
-            @Override
-            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
-                    throws AlgebricksException {
-                target.addAllVariables(sources[0]);
-                for (LogicalVariable v : variables) {
-                    target.addVariable(v);
-                }
-            }
-        };
+        return createVariablePropagationPolicy(true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
new file mode 100644
index 0000000..aa1791f
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Window operator evaluates window functions. It has the following components:
+ * <ul>
+ * <li>{@link #partitionExpressions} - define how input data must be partitioned</li>
+ * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered</li>
+ * <li>{@link #expressions} - window function expressions (running aggregates)</li>
+ * <li>{@link #variables} - output variables containing return values of these functions</li>
+ * </ul>
+ *
+ * Window operator does not change cardinality of the input stream.
+ */
+public class WindowOperator extends AbstractAssignOperator {
+
+    private final List<Mutable<ILogicalExpression>> partitionExpressions;
+
+    private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+
+    public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions,
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+            List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
+        super(variables, expressions);
+        this.partitionExpressions = new ArrayList<>();
+        if (partitionExpressions != null) {
+            this.partitionExpressions.addAll(partitionExpressions);
+        }
+        this.orderExpressions = new ArrayList<>();
+        if (orderExpressions != null) {
+            this.orderExpressions.addAll(orderExpressions);
+        }
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.WINDOW;
+    }
+
+    public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> getOrderExpressions() {
+        return orderExpressions;
+    }
+
+    public List<Mutable<ILogicalExpression>> getPartitionExpressions() {
+        return partitionExpressions;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitWindowOperator(this, arg);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        boolean mod = super.acceptExpressionTransform(visitor);
+        for (Mutable<ILogicalExpression> expr : partitionExpressions) {
+            mod |= visitor.transform(expr);
+        }
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+            mod |= visitor.transform(p.second);
+        }
+        return mod;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return createVariablePropagationPolicy(true);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+        int n = variables.size();
+        for (int i = 0; i < n; i++) {
+            env.setVarType(variables.get(i), ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(),
+                    ctx.getMetadataProvider(), env));
+        }
+        return env;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 9d853eb..8535204 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -61,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -160,6 +161,11 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long
     }
 
     @Override
+    public Long visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
     public Long visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
         return adjustCardinalityForTupleReductionOperator(op.getInputs().get(0).getValue().accept(this, arg));
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 16fc1ed..7042794 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -78,6 +78,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
@@ -812,4 +813,9 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
         return null;
     }
 
+    @Override
+    public Void visitWindowOperator(WindowOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
 }