You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2018/11/01 03:54:41 UTC
[3/5] asterixdb git commit: [ASTERIXDB-2466][FUN] Implement window
functions
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
index b754533..3fb3507 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
@@ -62,6 +62,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
import org.apache.asterix.lang.sqlpp.visitor.base.AbstractSqlppQueryExpressionVisitor;
@@ -424,6 +425,16 @@ public class FreeVariableVisitor extends AbstractSqlppQueryExpressionVisitor<Voi
return null;
}
+ @Override
+ public Void visit(WindowExpression winExpr, Collection<VariableExpr> freeVars) throws CompilationException {
+ winExpr.getExpr().accept(this, freeVars);
+ if (winExpr.hasPartitionList()) {
+ visit(winExpr.getPartitionList(), freeVars);
+ }
+ visit(winExpr.getOrderbyList(), freeVars);
+ return null;
+ }
+
private void visitLetClauses(List<LetClause> letClauses, Collection<VariableExpr> freeVars)
throws CompilationException {
if (letClauses == null || letClauses.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
index 0973bec..e4d2a27 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppAstPrintVisitor.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.clause.GroupbyClause;
import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.clause.OrderbyClause;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.GbyVariableExpressionPair;
import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -47,6 +48,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
import org.apache.asterix.lang.sqlpp.util.FunctionMapUtil;
import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
@@ -331,4 +333,28 @@ public class SqlppAstPrintVisitor extends QueryPrintVisitor implements ISqlppVis
return null;
}
+ @Override
+ public Void visit(WindowExpression winExpr, Integer step) throws CompilationException {
+ out.print(skip(step) + "WINDOW");
+ winExpr.getExpr().accept(this, step + 1);
+ out.println();
+ out.println(skip(step) + "OVER (");
+ if (winExpr.hasPartitionList()) {
+ out.println(skip(step + 1) + "PARTITION BY");
+ List<Expression> partitionList = winExpr.getPartitionList();
+ for (Expression expr : partitionList) {
+ expr.accept(this, step + 2);
+ out.println();
+ }
+ }
+ out.println(skip(step + 1) + "ORDER BY");
+ List<Expression> orderbyList = winExpr.getOrderbyList();
+ List<OrderbyClause.OrderModifier> orderbyModifierList = winExpr.getOrderbyModifierList();
+ for (int i = 0, ln = orderbyList.size(); i < ln; i++) {
+ orderbyList.get(i).accept(this, step + 2);
+ out.println(" " + orderbyModifierList.get(i));
+ }
+ out.println(skip(step) + ")");
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
index e00a3bd..0c70572 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
@@ -50,6 +50,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
@@ -411,4 +412,20 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV
newCaseExpr.setSourceLocation(caseExpr.getSourceLocation());
return new Pair<>(newCaseExpr, env);
}
+
+ @Override
+ public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(WindowExpression winExpr,
+ VariableSubstitutionEnvironment env) throws CompilationException {
+ Expression newExpr = (Expression) winExpr.getExpr().accept(this, env).first;
+ List<Expression> newPartitionList = winExpr.hasPartitionList()
+ ? VariableCloneAndSubstitutionUtil.visitAndCloneExprList(winExpr.getPartitionList(), env, this) : null;
+ List<Expression> newOrderbyList =
+ VariableCloneAndSubstitutionUtil.visitAndCloneExprList(winExpr.getOrderbyList(), env, this);
+ List<OrderbyClause.OrderModifier> newOrderbyModifierList = new ArrayList<>(winExpr.getOrderbyModifierList());
+ WindowExpression newWinExpr =
+ new WindowExpression(newExpr, newPartitionList, newOrderbyList, newOrderbyModifierList);
+ newWinExpr.setSourceLocation(winExpr.getSourceLocation());
+ newWinExpr.addHints(winExpr.getHints());
+ return new Pair<>(newWinExpr, env);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
index 755cc69..99368f8 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppFormatPrintVisitor.java
@@ -44,6 +44,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
@@ -330,4 +331,25 @@ public class SqlppFormatPrintVisitor extends FormatPrintVisitor implements ISqlp
out.print(SqlppVariableUtil.toUserDefinedName(v.getVar().getValue()));
return null;
}
+
+ @Override
+ public Void visit(WindowExpression windowExpr, Integer step) throws CompilationException {
+ out.print(skip(step) + "window ");
+ windowExpr.getExpr().accept(this, step + 2);
+ out.print(skip(step) + " over (");
+ if (windowExpr.hasPartitionList()) {
+ List<Expression> partitionList = windowExpr.getPartitionList();
+ for (int i = 0, ln = partitionList.size(); i < ln; i++) {
+ if (i > 0) {
+ out.print(COMMA);
+ }
+ Expression partExpr = partitionList.get(i);
+ partExpr.accept(this, step + 2);
+ }
+ }
+ out.print(" order by ");
+ printDelimitedObyExpressions(windowExpr.getOrderbyList(), windowExpr.getOrderbyModifierList(), step + 2);
+ out.println(skip(step) + ")");
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
index 5396768..92ebd60 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppAstVisitor.java
@@ -34,6 +34,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
/**
* A dummy abstract visitor to allow an implementation to only fill in necessary stuff.
@@ -110,4 +111,8 @@ public abstract class AbstractSqlppAstVisitor<R, T> extends AbstractAstVisitor<R
return null;
}
+ @Override
+ public R visit(WindowExpression winExpr, T arg) throws CompilationException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
index 4d35914..a3bb592 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
@@ -61,6 +61,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
import org.apache.asterix.lang.sqlpp.struct.SetOperationRight;
public class AbstractSqlppSimpleExpressionVisitor
@@ -321,6 +322,16 @@ public class AbstractSqlppSimpleExpressionVisitor
}
@Override
+ public Expression visit(WindowExpression winExpr, ILangExpression arg) throws CompilationException {
+ winExpr.setExpr(visit(winExpr.getExpr(), arg));
+ if (winExpr.hasPartitionList()) {
+ winExpr.setPartitionList(visit(winExpr.getPartitionList(), winExpr));
+ }
+ winExpr.setOrderbyList(visit(winExpr.getOrderbyList(), winExpr));
+ return winExpr;
+ }
+
+ @Override
public Expression visit(FieldAccessor fa, ILangExpression arg) throws CompilationException {
fa.setExpr(visit(fa.getExpr(), fa));
return fa;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
index 02d9142..13addaf 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/ISqlppVisitor.java
@@ -34,6 +34,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
public interface ISqlppVisitor<R, T> extends ILangVisitor<R, T> {
@@ -64,4 +65,6 @@ public interface ISqlppVisitor<R, T> extends ILangVisitor<R, T> {
R visit(HavingClause havingClause, T arg) throws CompilationException;
R visit(CaseExpression caseExpression, T arg) throws CompilationException;
+
+ R visit(WindowExpression windowExpression, T arg) throws CompilationException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 13d1f8d..e19ee7a 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -166,6 +166,7 @@ import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.expression.WindowExpression;
import org.apache.asterix.lang.sqlpp.optype.JoinType;
import org.apache.asterix.lang.sqlpp.optype.SetOpType;
import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
@@ -2588,7 +2589,7 @@ FieldBinding FieldBinding() throws ParseException:
Expression FunctionCallExpr() throws ParseException:
{
- CallExpr callExpr;
+ Expression resultExpr;
List<Expression> argList = new ArrayList<Expression>();
Expression tmp = null;
int arity = 0;
@@ -2596,6 +2597,10 @@ Expression FunctionCallExpr() throws ParseException:
String hint = null;
boolean star = false;
boolean distinct = false;
+ Token overToken = null;
+ Expression partitionExpr = null;
+ List<Expression> partitionExprs = new ArrayList<Expression>();
+ OrderbyClause orderByClause = null;
}
{
funcName = FunctionName()
@@ -2634,7 +2639,7 @@ Expression FunctionCallExpr() throws ParseException:
if (signature == null) {
signature = new FunctionSignature(funcName.dataverse, fqFunctionName, arity);
}
- callExpr = FunctionMapUtil.normalizedListInputFunctions(new CallExpr(signature,argList));
+ CallExpr callExpr = FunctionMapUtil.normalizedListInputFunctions(new CallExpr(signature,argList));
if (hint != null) {
if (hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) {
callExpr.addHint(IndexedNLJoinExpressionAnnotation.INSTANCE);
@@ -2643,8 +2648,29 @@ Expression FunctionCallExpr() throws ParseException:
}
}
callExpr.setSourceLocation(funcName.sourceLoc);
- return callExpr;
+ resultExpr = callExpr;
}
+
+ (
+ <OVER> { overToken = token; }
+ <LEFTPAREN>
+ (
+ <PARTITION> <BY>
+ partitionExpr = Expression() { partitionExprs.add(partitionExpr); }
+ ( <COMMA> partitionExpr = Expression() { partitionExprs.add(partitionExpr); } )*
+ )?
+ orderByClause = OrderbyClause()
+ <RIGHTPAREN>
+ {
+ WindowExpression winExp = new WindowExpression(callExpr, partitionExprs, orderByClause.getOrderbyList(),
+ orderByClause.getModifierList());
+ resultExpr = addSourceLocation(winExp, overToken);
+ }
+ )?
+
+ {
+ return resultExpr;
+ }
}
Expression ParenthesizedExpression() throws ParseException:
@@ -3393,7 +3419,9 @@ TOKEN [IGNORE_CASE]:
| <ORDER : "order">
| <OUTER : "outer">
| <OUTPUT : "output">
+ | <OVER: "over">
| <PATH : "path">
+ | <PARTITION : "partition">
| <POLICY : "policy">
| <PRESORTED : "pre-sorted">
| <PRIMARY : "primary">
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 3fb03a1..ccb124a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -132,10 +132,13 @@ import org.apache.asterix.om.typecomputer.impl.UnorderedListConstructorTypeCompu
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
public class BuiltinFunctions {
@@ -162,6 +165,9 @@ public class BuiltinFunctions {
private static final Map<IFunctionInfo, IFunctionInfo> scalarToAggregateFunctionMap = new HashMap<>();
private static final Map<IFunctionInfo, IFunctionInfo> distinctToRegularScalarAggregateFunctionMap =
new HashMap<>();
+ private static final Map<IFunctionInfo, IFunctionInfo> builtinWindowFunctions = new HashMap<>();
+ private static final Set<IFunctionInfo> builtinWindowFunctionsWithOrderArgs = new HashSet<>();
+ private static final Set<IFunctionInfo> builtinWindowFunctionsWithMaterialization = new HashSet<>();
private static final Map<IFunctionInfo, SpatialFilterKind> spatialFilterFunctions = new HashMap<>();
@@ -809,6 +815,26 @@ public class BuiltinFunctions {
public static final FunctionIdentifier SCALAR_SQL_VAR_POP_DISTINCT =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sql-var_pop-distinct", 1);
+ // window functions
+ public static final FunctionIdentifier ROW_NUMBER =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "row-number", 0);
+ public static final FunctionIdentifier ROW_NUMBER_IMPL =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "row-number-impl", 0);
+ public static final FunctionIdentifier RANK = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "rank", 0);
+ public static final FunctionIdentifier RANK_IMPL =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "rank-impl", FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier DENSE_RANK =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dense-rank", 0);
+ public static final FunctionIdentifier DENSE_RANK_IMPL =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dense-rank-impl", FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier PERCENT_RANK =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "percent-rank", 0);
+ public static final FunctionIdentifier PERCENT_RANK_IMPL =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "percent-rank-impl", FunctionIdentifier.VARARGS);
+ public static final FunctionIdentifier NTILE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "ntile", 1);
+ public static final FunctionIdentifier NTILE_IMPL =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "ntile-impl", FunctionIdentifier.VARARGS);
+
// unnesting functions
public static final FunctionIdentifier SCAN_COLLECTION =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "scan-collection", 1);
@@ -1727,6 +1753,19 @@ public class BuiltinFunctions {
addFunction(SQL_VAR_POP_DISTINCT, NullableDoubleTypeComputer.INSTANCE, true);
addFunction(SCALAR_SQL_VAR_POP_DISTINCT, NullableDoubleTypeComputer.INSTANCE, true);
+ // Window functions
+
+ addFunction(ROW_NUMBER, AInt64TypeComputer.INSTANCE, true);
+ addPrivateFunction(ROW_NUMBER_IMPL, AInt64TypeComputer.INSTANCE, true);
+ addFunction(RANK, AInt64TypeComputer.INSTANCE, true);
+ addPrivateFunction(RANK_IMPL, AInt64TypeComputer.INSTANCE, true);
+ addFunction(DENSE_RANK, AInt64TypeComputer.INSTANCE, true);
+ addPrivateFunction(DENSE_RANK_IMPL, AInt64TypeComputer.INSTANCE, true);
+ addFunction(PERCENT_RANK, ADoubleTypeComputer.INSTANCE, true);
+ addPrivateFunction(PERCENT_RANK_IMPL, ADoubleTypeComputer.INSTANCE, true);
+ addFunction(NTILE, AInt64TypeComputer.INSTANCE, true);
+ addPrivateFunction(NTILE_IMPL, AInt64TypeComputer.INSTANCE, true);
+
// Similarity functions
addFunction(EDIT_DISTANCE_CONTAINS, OrderedListOfAnyTypeComputer.INSTANCE, true);
addFunction(SIMILARITY_JACCARD, AFloatTypeComputer.INSTANCE, true);
@@ -2487,6 +2526,15 @@ public class BuiltinFunctions {
}
static {
+ // Window functions
+ addWindowFunction(ROW_NUMBER, ROW_NUMBER_IMPL, false, false);
+ addWindowFunction(RANK, RANK_IMPL, true, false);
+ addWindowFunction(DENSE_RANK, DENSE_RANK_IMPL, true, false);
+ addWindowFunction(PERCENT_RANK, PERCENT_RANK_IMPL, true, true);
+ addWindowFunction(NTILE, NTILE_IMPL, false, true);
+ }
+
+ static {
addUnnestFun(RANGE, true);
addUnnestFun(SCAN_COLLECTION, false);
addUnnestFun(SUBSET_COLLECTION, false);
@@ -2667,6 +2715,40 @@ public class BuiltinFunctions {
getAsterixFunctionInfo(regularscalarfi));
}
+ public static void addWindowFunction(FunctionIdentifier fi, FunctionIdentifier implfi, boolean requiresOrderArgs,
+ boolean requiresMaterialization) {
+ IFunctionInfo implFinfo = getAsterixFunctionInfo(implfi);
+ builtinWindowFunctions.put(getAsterixFunctionInfo(fi), implFinfo);
+ if (requiresOrderArgs) {
+ builtinWindowFunctionsWithOrderArgs.add(implFinfo);
+ }
+ if (requiresMaterialization) {
+ builtinWindowFunctionsWithMaterialization.add(implFinfo);
+ }
+ }
+
+ public static boolean isBuiltinWindowFunction(FunctionIdentifier fi) {
+ return builtinWindowFunctions.containsKey(getAsterixFunctionInfo(fi));
+ }
+
+ public static boolean windowFunctionRequiresOrderArgs(FunctionIdentifier implfi) {
+ return builtinWindowFunctionsWithOrderArgs.contains(getAsterixFunctionInfo(implfi));
+ }
+
+ public static boolean windowFunctionRequiresMaterialization(FunctionIdentifier implfi) {
+ return builtinWindowFunctionsWithMaterialization.contains(getAsterixFunctionInfo(implfi));
+ }
+
+ public static AbstractFunctionCallExpression makeWindowFunctionExpression(FunctionIdentifier scalarfi,
+ List<Mutable<ILogicalExpression>> args) {
+ IFunctionInfo finfo = getAsterixFunctionInfo(scalarfi);
+ IFunctionInfo implFinfo = builtinWindowFunctions.get(finfo);
+ if (implFinfo == null) {
+ throw new IllegalStateException("no implementation for window function " + finfo);
+ }
+ return new StatefulFunctionCallExpression(implFinfo, UnpartitionedPropertyComputer.INSTANCE, args);
+ }
+
static {
spatialFilterFunctions.put(getAsterixFunctionInfo(BuiltinFunctions.SPATIAL_INTERSECT), SpatialFilterKind.SI);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 19c33db..1b0c7b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -439,6 +439,11 @@ import org.apache.asterix.runtime.evaluators.functions.temporal.UnixTimeFromDate
import org.apache.asterix.runtime.evaluators.functions.temporal.UnixTimeFromTimeInMsDescriptor;
import org.apache.asterix.runtime.evaluators.functions.temporal.YearMonthDurationGreaterThanComparatorDescriptor;
import org.apache.asterix.runtime.evaluators.functions.temporal.YearMonthDurationLessThanComparatorDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.DenseRankRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.NtileRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.PercentRankRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.RankRunningAggregateDescriptor;
+import org.apache.asterix.runtime.runningaggregates.std.RowNumberRunningAggregateDescriptor;
import org.apache.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
import org.apache.asterix.runtime.unnestingfunctions.std.RangeDescriptor;
import org.apache.asterix.runtime.unnestingfunctions.std.ScanCollectionDescriptor;
@@ -632,6 +637,13 @@ public final class FunctionCollection implements IFunctionCollection {
fc.add(ScalarSqlVarAggregateDescriptor.FACTORY);
fc.add(ScalarSqlVarPopAggregateDescriptor.FACTORY);
+ // window functions
+ fc.add(RowNumberRunningAggregateDescriptor.FACTORY);
+ fc.add(RankRunningAggregateDescriptor.FACTORY);
+ fc.add(DenseRankRunningAggregateDescriptor.FACTORY);
+ fc.add(PercentRankRunningAggregateDescriptor.FACTORY);
+ fc.add(NtileRunningAggregateDescriptor.FACTORY);
+
// boolean functions
fc.add(AndDescriptor.FACTORY);
fc.add(OrDescriptor.FACTORY);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java
new file mode 100644
index 0000000..54bcc1a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/AbstractRankRunningAggregateEvaluator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import java.io.DataOutput;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Base evaluator implementation for ranking window functions:
+ * {@code rank()}, {@code dense_rank()}, {@code percent_rank()}
+ */
+public abstract class AbstractRankRunningAggregateEvaluator implements IWindowAggregateEvaluator {
+
+ private final IScalarEvaluator[] args;
+
+ private final ArrayBackedValueStorage[] argPrevValues;
+
+ private final IPointable[] argCurrValues;
+
+ private final boolean dense;
+
+ protected final SourceLocation sourceLoc;
+
+ private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+ private IBinaryComparator[] argComparators;
+
+ private boolean first;
+
+ private long rank;
+
+ private long groupSize;
+
+ AbstractRankRunningAggregateEvaluator(IScalarEvaluator[] args, boolean dense, SourceLocation sourceLoc) {
+ this.args = args;
+ this.dense = dense;
+ this.sourceLoc = sourceLoc;
+ argPrevValues = new ArrayBackedValueStorage[args.length];
+ argCurrValues = new IPointable[args.length];
+ for (int i = 0; i < args.length; i++) {
+ argPrevValues[i] = new ArrayBackedValueStorage();
+ argCurrValues[i] = VoidPointable.FACTORY.createPointable();
+ }
+ }
+
+ @Override
+ public void configure(IBinaryComparator[] orderComparators) {
+ argComparators = orderComparators;
+ }
+
+ @Override
+ public void init() throws HyracksDataException {
+ }
+
+ @Override
+ public void initPartition(long partitionLength) {
+ first = true;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+ for (int i = 0; i < args.length; i++) {
+ args[i].evaluate(tuple, argCurrValues[i]);
+ }
+
+ computeRank();
+
+ for (int i = 0; i < args.length; i++) {
+ argPrevValues[i].assign(argCurrValues[i]);
+ }
+
+ resultStorage.reset();
+ writeResult(rank, resultStorage.getDataOutput());
+ result.set(resultStorage);
+ }
+
+ protected abstract void writeResult(long rank, DataOutput out) throws HyracksDataException;
+
+ private void computeRank() throws HyracksDataException {
+ if (first) {
+ rank = 1;
+ groupSize = 1;
+ first = false;
+ } else if (sameGroup()) {
+ groupSize++;
+ } else {
+ rank += dense ? 1 : groupSize;
+ groupSize = 1;
+ }
+ }
+
+ private boolean sameGroup() throws HyracksDataException {
+ for (int i = 0; i < args.length; i++) {
+ IPointable v1 = argPrevValues[i];
+ IPointable v2 = argCurrValues[i];
+ IBinaryComparator cmp = argComparators[i];
+ if (cmp.compare(v1.getByteArray(), v1.getStartOffset(), v1.getLength(), v2.getByteArray(),
+ v2.getStartOffset(), v2.getLength()) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java
new file mode 100644
index 0000000..6e51559
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/DenseRankRunningAggregateDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code dense_rank()} window function
+ */
+public class DenseRankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = DenseRankRunningAggregateDescriptor::new;
+
+ @Override
+ public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ return new IRunningAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ IScalarEvaluator[] evals = new IScalarEvaluator[args.length];
+ for (int i = 0; i < args.length; i++) {
+ evals[i] = args[i].createScalarEvaluator(ctx);
+ }
+ return new RankRunningAggregateEvaluator(evals, true, sourceLoc);
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.DENSE_RANK_IMPL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java
new file mode 100644
index 0000000..5157451
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code ntile()} window function
+ */
+public class NtileRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = NtileRunningAggregateDescriptor::new;
+
+ @Override
+ public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ return new IRunningAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ return new NtileRunningAggregateEvaluator(args[0].createScalarEvaluator(ctx), getIdentifier());
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.NTILE_IMPL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java
new file mode 100644
index 0000000..9bd306e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/NtileRunningAggregateEvaluator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Evaluator {@code ntile()} window function
+ */
+public class NtileRunningAggregateEvaluator implements IWindowAggregateEvaluator {
+
+ private final IScalarEvaluator evalNumGroups;
+
+ private final VoidPointable argNumGroups = VoidPointable.FACTORY.createPointable();
+
+ private final FunctionIdentifier funId;
+
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<AInt64> serde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+
+ private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+ private final AMutableInt64 aInt64 = new AMutableInt64(0);
+
+ private long partitionLength;
+
+ private long groupSize;
+
+ private long groupRemainder;
+
+ private long resultValue;
+
+ private long count;
+
+ NtileRunningAggregateEvaluator(IScalarEvaluator evalNumGroups, FunctionIdentifier funId) {
+ this.evalNumGroups = evalNumGroups;
+ this.funId = funId;
+ }
+
+ @Override
+ public void init() throws HyracksDataException {
+ }
+
+ @Override
+ public void initPartition(long partitionLength) {
+ this.partitionLength = partitionLength;
+ resultValue = 0;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+ if (resultValue == 0) {
+ evaluateGroupSize(tuple);
+ resultValue = count = 1;
+ } else if (count < groupSize) {
+ count++;
+ } else if (count == groupSize && groupRemainder > 0) {
+ groupRemainder--;
+ count++;
+ } else {
+ resultValue++;
+ count = 1;
+ }
+
+ resultStorage.reset();
+ aInt64.setValue(resultValue);
+ serde.serialize(aInt64, resultStorage.getDataOutput());
+ result.set(resultStorage);
+ }
+
+ private void evaluateGroupSize(IFrameTupleReference tuple) throws HyracksDataException {
+ evalNumGroups.evaluate(tuple, argNumGroups);
+ byte[] bytes = argNumGroups.getByteArray();
+ int offset = argNumGroups.getStartOffset();
+ long numGroups = ATypeHierarchy.getLongValue(funId.getName(), 0, bytes, offset);
+ if (numGroups > partitionLength || numGroups <= 0) {
+ groupSize = partitionLength;
+ groupRemainder = 0;
+ } else {
+ groupSize = partitionLength / numGroups;
+ groupRemainder = partitionLength % numGroups;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java
new file mode 100644
index 0000000..180ca99
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code percent_rank()} window function
+ */
+public class PercentRankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = PercentRankRunningAggregateDescriptor::new;
+
+ @Override
+ public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ return new IRunningAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ IScalarEvaluator[] evals = new IScalarEvaluator[args.length];
+ for (int i = 0; i < args.length; i++) {
+ evals[i] = args[i].createScalarEvaluator(ctx);
+ }
+ return new PercentRankRunningAggregateEvaluator(evals, sourceLoc);
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.PERCENT_RANK_IMPL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java
new file mode 100644
index 0000000..c73d9fd
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/PercentRankRunningAggregateEvaluator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * Evaluator {@code percent_rank()} window function
+ */
+class PercentRankRunningAggregateEvaluator extends AbstractRankRunningAggregateEvaluator {
+
+ private final AMutableDouble aDouble = new AMutableDouble(0);
+
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADouble> serde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+
+ private double divisor;
+
+ PercentRankRunningAggregateEvaluator(IScalarEvaluator[] args, SourceLocation sourceLoc) {
+ super(args, false, sourceLoc);
+ }
+
+ @Override
+ public void initPartition(long partitionLength) {
+ super.initPartition(partitionLength);
+ divisor = (double) partitionLength - 1;
+ }
+
+ @Override
+ protected void writeResult(long rank, DataOutput out) throws HyracksDataException {
+ double percentRank = (rank - 1) / divisor;
+ aDouble.setValue(percentRank);
+ serde.serialize(aDouble, out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java
new file mode 100644
index 0000000..d5db134
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Descriptor {@code rank()} window function
+ */
+public class RankRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = RankRunningAggregateDescriptor::new;
+
+ @Override
+ public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ return new IRunningAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ IScalarEvaluator[] evals = new IScalarEvaluator[args.length];
+ for (int i = 0; i < args.length; i++) {
+ evals[i] = args[i].createScalarEvaluator(ctx);
+ }
+ return new RankRunningAggregateEvaluator(evals, false, sourceLoc);
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.RANK_IMPL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java
new file mode 100644
index 0000000..56ab299
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RankRunningAggregateEvaluator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * Evaluator for {@code rank()} and {@code dense_rank()} window functions
+ */
+class RankRunningAggregateEvaluator extends AbstractRankRunningAggregateEvaluator {
+
+ private final AMutableInt64 aInt64 = new AMutableInt64(0);
+
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<AInt64> serde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+
+ RankRunningAggregateEvaluator(IScalarEvaluator[] args, boolean dense, SourceLocation sourceLoc) {
+ super(args, dense, sourceLoc);
+ }
+
+ protected void writeResult(long rank, DataOutput out) throws HyracksDataException {
+ aInt64.setValue(rank);
+ serde.serialize(aInt64, out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java
new file mode 100644
index 0000000..7464751
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.runningaggregates.base.AbstractRunningAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Descriptor {@code row_number()} window function
+ */
+public class RowNumberRunningAggregateDescriptor extends AbstractRunningAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = RowNumberRunningAggregateDescriptor::new;
+
+ @Override
+ public IRunningAggregateEvaluatorFactory createRunningAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ return new IRunningAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext ctx) {
+ return new RowNumberRunningAggregateEvaluator();
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.ROW_NUMBER_IMPL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java
new file mode 100644
index 0000000..75fface
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/runningaggregates/std/RowNumberRunningAggregateEvaluator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.runningaggregates.std;
+
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Evaluator for {@code row_number()} window function
+ */
+class RowNumberRunningAggregateEvaluator implements IWindowAggregateEvaluator {
+
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<AInt64> serde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+
+ private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+ private final AMutableInt64 aInt64 = new AMutableInt64(0);
+
+ private long resultValue;
+
+ @Override
+ public void init() {
+ // nothing to do
+ }
+
+ @Override
+ public void initPartition(long partitionLength) {
+ resultValue = 0;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+ resultValue++;
+
+ resultStorage.reset();
+ aInt64.setValue(resultValue);
+ serde.serialize(aInt64, resultStorage.getDataOutput());
+ result.set(resultStorage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
index 2dd71cd..a0d5880 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/utils/Pair.java
@@ -33,6 +33,22 @@ public class Pair<T1, T2> implements Serializable {
this.second = second;
}
+ public void setFirst(T1 value) {
+ first = value;
+ }
+
+ public T1 getFirst() {
+ return first;
+ }
+
+ public void setSecond(T2 value) {
+ second = value;
+ }
+
+ public T2 getSecond() {
+ return second;
+ }
+
@Override
public String toString() {
return first + "," + second;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 6bd0d02..d996caf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -89,7 +89,7 @@ public interface ILogicalOperator {
*/
public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties,
- IOptimizationContext context);
+ IOptimizationContext context) throws AlgebricksException;
/**
* @return the physical properties that this operator delivers, based on
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index 2a92aba..a88ec64 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -40,7 +40,7 @@ public interface IPhysicalOperator {
* @return for each child, one vector of required physical properties
*/
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context);
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException;
/**
* @return the physical properties that this operator delivers, based on
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 3794328..4466408 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -56,4 +56,5 @@ public enum LogicalOperatorTag {
WRITE,
WRITE_RESULT,
INTERSECT,
+ WINDOW
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index ac1de5a..5d19134 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -79,4 +79,5 @@ public enum PhysicalOperatorTag {
UPDATE,
WRITE_RESULT,
INTERSECT,
+ WINDOW
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
index 51040fd..e15ed92 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractAssignOperator.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
/**
@@ -71,4 +72,17 @@ public abstract class AbstractAssignOperator extends AbstractLogicalOperator {
return modif;
}
+ protected VariablePropagationPolicy createVariablePropagationPolicy(boolean propagateInputVars) {
+ return new VariablePropagationPolicy() {
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) {
+ if (propagateInputVars) {
+ target.addAllVariables(sources[0]);
+ }
+ for (LogicalVariable v : variables) {
+ target.addVariable(v);
+ }
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 1dbf15e..d4a9d37 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -115,7 +115,7 @@ public abstract class AbstractLogicalOperator implements ILogicalOperator {
@Override
public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren(
- IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) {
+ IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) throws AlgebricksException {
return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties, context);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index b4a59a8..35cb087 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -57,16 +57,7 @@ public class AggregateOperator extends AbstractAssignOperator {
@Override
public VariablePropagationPolicy getVariablePropagationPolicy() {
- return new VariablePropagationPolicy() {
-
- @Override
- public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
- throws AlgebricksException {
- for (LogicalVariable v : variables) {
- target.addVariable(v);
- }
- }
- };
+ return createVariablePropagationPolicy(false);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index 861d74c..202c291 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -65,18 +65,7 @@ public class AssignOperator extends AbstractAssignOperator {
@Override
public VariablePropagationPolicy getVariablePropagationPolicy() {
- return new VariablePropagationPolicy() {
-
- @Override
- public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
- throws AlgebricksException {
- target.addAllVariables(sources[0]);
- for (LogicalVariable v : variables) {
- target.addVariable(v);
- }
- }
- };
-
+ return createVariablePropagationPolicy(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
index ef16613..8a710cc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RunningAggregateOperator.java
@@ -49,17 +49,7 @@ public class RunningAggregateOperator extends AbstractAssignOperator {
@Override
public VariablePropagationPolicy getVariablePropagationPolicy() {
- return new VariablePropagationPolicy() {
-
- @Override
- public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
- throws AlgebricksException {
- target.addAllVariables(sources[0]);
- for (LogicalVariable v : variables) {
- target.addVariable(v);
- }
- }
- };
+ return createVariablePropagationPolicy(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
new file mode 100644
index 0000000..aa1791f
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Window operator evaluates window functions. It has the following components:
+ * <ul>
+ * <li>{@link #partitionExpressions} - define how input data must be partitioned</li>
+ * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered</li>
+ * <li>{@link #expressions} - window function expressions (running aggregates)</li>
+ * <li>{@link #variables} - output variables containing return values of these functions</li>
+ * </ul>
+ *
+ * Window operator does not change cardinality of the input stream.
+ */
+public class WindowOperator extends AbstractAssignOperator {
+
+ private final List<Mutable<ILogicalExpression>> partitionExpressions;
+
+ private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+
+ public WindowOperator(List<Mutable<ILogicalExpression>> partitionExpressions,
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+ List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
+ super(variables, expressions);
+ this.partitionExpressions = new ArrayList<>();
+ if (partitionExpressions != null) {
+ this.partitionExpressions.addAll(partitionExpressions);
+ }
+ this.orderExpressions = new ArrayList<>();
+ if (orderExpressions != null) {
+ this.orderExpressions.addAll(orderExpressions);
+ }
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.WINDOW;
+ }
+
+ public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> getOrderExpressions() {
+ return orderExpressions;
+ }
+
+ public List<Mutable<ILogicalExpression>> getPartitionExpressions() {
+ return partitionExpressions;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitWindowOperator(this, arg);
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+ boolean mod = super.acceptExpressionTransform(visitor);
+ for (Mutable<ILogicalExpression> expr : partitionExpressions) {
+ mod |= visitor.transform(expr);
+ }
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+ mod |= visitor.transform(p.second);
+ }
+ return mod;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return createVariablePropagationPolicy(true);
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+ int n = variables.size();
+ for (int i = 0; i < n; i++) {
+ env.setVarType(variables.get(i), ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(),
+ ctx.getMetadataProvider(), env));
+ }
+ return env;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 9d853eb..8535204 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -61,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -160,6 +161,11 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long
}
@Override
+ public Long visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ return op.getInputs().get(0).getValue().accept(this, arg);
+ }
+
+ @Override
public Long visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
return adjustCardinalityForTupleReductionOperator(op.getInputs().get(0).getValue().accept(this, arg));
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 16fc1ed..7042794 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -78,6 +78,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
@@ -812,4 +813,9 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
return null;
}
+ @Override
+ public Void visitWindowOperator(WindowOperator op, IOptimizationContext ctx) throws AlgebricksException {
+ propagateFDsAndEquivClasses(op, ctx);
+ return null;
+ }
}