You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2018/11/01 03:54:40 UTC
[2/5] asterixdb git commit: [ASTERIXDB-2466][FUN] Implement window
functions
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 2b5e569..d91a255 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -69,6 +69,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -609,7 +610,32 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
@Override
public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
- return true;
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.SINK) {
+ return Boolean.FALSE;
+ }
+ return Boolean.TRUE;
+ }
+
+ @Override
+ public Boolean visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.WINDOW) {
+ return Boolean.FALSE;
+ }
+ WindowOperator windowOpArg = (WindowOperator) copyAndSubstituteVar(op, arg);
+ if (!VariableUtilities.varListEqualUnordered(op.getPartitionExpressions(),
+ windowOpArg.getPartitionExpressions())) {
+ return Boolean.FALSE;
+ }
+ if (!compareIOrderAndExpressions(op.getOrderExpressions(), windowOpArg.getOrderExpressions())) {
+ return Boolean.FALSE;
+ }
+ if (!VariableUtilities.varListEqualUnordered(getPairList(op.getVariables(), op.getExpressions()),
+ getPairList(windowOpArg.getVariables(), windowOpArg.getExpressions()))) {
+ return Boolean.FALSE;
+ }
+ return Boolean.TRUE;
}
private Boolean compareExpressions(List<Mutable<ILogicalExpression>> opExprs,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 742d485..d0aec16 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -68,6 +68,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -96,6 +97,13 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
}
@Override
+ public Void visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException {
+ mapChildren(op, arg);
+ mapVariablesForAbstractAssign(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalOperator arg)
throws AlgebricksException {
mapVariablesStandard(op, arg);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 0196db6..198ffdc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -611,6 +612,20 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
return opCopy;
}
+ @Override
+ public ILogicalOperator visitWindowOperator(WindowOperator op, ILogicalOperator arg) throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> partitionExprCopy =
+ exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getPartitionExpressions());
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExprCopy =
+ deepCopyOrderExpressionReferencePairList(op.getOrderExpressions());
+ List<LogicalVariable> varCopy = deepCopyVariableList(op.getVariables());
+ List<Mutable<ILogicalExpression>> exprCopy =
+ exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions());
+ WindowOperator opCopy = new WindowOperator(partitionExprCopy, orderExprCopy, varCopy, exprCopy);
+ deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+ return opCopy;
+ }
+
public LinkedHashMap<LogicalVariable, LogicalVariable> getInputToOutputVariableMapping() {
return inputVarToOutputVarMapping;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 7d3d676..0aaa529 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -60,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
@@ -205,6 +206,12 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
}
@Override
+ public Void visitWindowOperator(WindowOperator op, IOptimizationContext context) throws AlgebricksException {
+ visitAssignment(op, context);
+ return null;
+ }
+
+ @Override
public Void visitScriptOperator(ScriptOperator op, IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
@@ -367,5 +374,4 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
throws AlgebricksException {
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index c6f0c14..e5ca646 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
@@ -406,4 +407,19 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
return new LeftOuterUnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
}
+
+ @Override
+ public ILogicalOperator visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> newPartitionExprs = new ArrayList<>();
+ deepCopyExpressionRefs(op.getPartitionExpressions(), newPartitionExprs);
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderExprs =
+ deepCopyOrderAndExpression(op.getOrderExpressions());
+
+ ArrayList<LogicalVariable> newList = new ArrayList<>();
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
+ newList.addAll(op.getVariables());
+ deepCopyExpressionRefs(newExpressions, op.getExpressions());
+
+ return new WindowOperator(newPartitionExprs, newOrderExprs, newList, newExpressions);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index f36f604..eb90288 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -62,6 +62,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
@@ -289,4 +290,8 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void,
return null;
}
+ @Override
+ public Void visitWindowOperator(WindowOperator op, IOptimizationContext arg) throws AlgebricksException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 5d0ef6a..43b7c80 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -91,6 +92,12 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
}
@Override
+ public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ producedVariables.addAll(op.getVariables());
+ return null;
+ }
+
+ @Override
public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
producedVariables.addAll(op.getVariables());
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 70ccf6d..69b17ed 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -64,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -343,4 +344,9 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
return null;
}
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index c62f555..99d3488 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -65,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
@@ -85,15 +86,7 @@ public class SubstituteVariableVisitor
@Override
public Void visitAggregateOperator(AggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
- List<LogicalVariable> variables = op.getVariables();
- int n = variables.size();
- for (int i = 0; i < n; i++) {
- if (variables.get(i).equals(pair.first)) {
- variables.set(i, pair.second);
- } else {
- op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
- }
- }
+ substAssignVariables(op.getVariables(), op.getExpressions(), pair);
substVarTypes(op, pair);
return null;
}
@@ -101,15 +94,7 @@ public class SubstituteVariableVisitor
@Override
public Void visitAssignOperator(AssignOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
- List<LogicalVariable> variables = op.getVariables();
- int n = variables.size();
- for (int i = 0; i < n; i++) {
- if (variables.get(i).equals(pair.first)) {
- variables.set(i, pair.second);
- } else {
- op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
- }
- }
+ substAssignVariables(op.getVariables(), op.getExpressions(), pair);
// Substitute variables stored in ordering property
if (op.getExplicitOrderingProperty() != null) {
List<OrderColumn> orderColumns = op.getExplicitOrderingProperty().getOrderColumns();
@@ -134,10 +119,10 @@ public class SubstituteVariableVisitor
return null;
}
}
- substVarTypes(op, pair);
if (op.getSelectCondition() != null) {
op.getSelectCondition().getValue().substituteVar(pair.first, pair.second);
}
+ substVarTypes(op, pair);
return null;
}
@@ -240,15 +225,7 @@ public class SubstituteVariableVisitor
@Override
public Void visitRunningAggregateOperator(RunningAggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
- List<LogicalVariable> variables = op.getVariables();
- int n = variables.size();
- for (int i = 0; i < n; i++) {
- if (variables.get(i).equals(pair.first)) {
- variables.set(i, pair.second);
- } else {
- op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
- }
- }
+ substAssignVariables(op.getVariables(), op.getExpressions(), pair);
substVarTypes(op, pair);
return null;
}
@@ -403,6 +380,18 @@ public class SubstituteVariableVisitor
}
}
+ private void substAssignVariables(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions,
+ Pair<LogicalVariable, LogicalVariable> pair) {
+ int n = variables.size();
+ for (int i = 0; i < n; i++) {
+ if (variables.get(i).equals(pair.first)) {
+ variables.set(i, pair.second);
+ } else {
+ expressions.get(i).getValue().substituteVar(pair.first, pair.second);
+ }
+ }
+ }
+
@Override
public Void visitReplicateOperator(ReplicateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
throws AlgebricksException {
@@ -510,4 +499,18 @@ public class SubstituteVariableVisitor
substVarTypes(op, pair);
return null;
}
+
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> pe : op.getPartitionExpressions()) {
+ pe.getValue().substituteVar(pair.first, pair.second);
+ }
+ for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+ oe.second.getValue().substituteVar(pair.first, pair.second);
+ }
+ substAssignVariables(op.getVariables(), op.getExpressions(), pair);
+ substVarTypes(op, pair);
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 2c68697..b4bea84 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
@@ -471,4 +472,17 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
return null;
}
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> exprRef : op.getPartitionExpressions()) {
+ exprRef.getValue().getUsedVariables(usedVariables);
+ }
+ for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+ oe.second.getValue().getUsedVariables(usedVariables);
+ }
+ for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+ exprRef.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index d68be20..09ee358 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -40,7 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.aggrun.RunningAggregateRuntimeFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
public class RunningAggregatePOperator extends AbstractPhysicalOperator {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
new file mode 100644
index 0000000..7853524
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.aggrun.WindowRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+public class WindowPOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> partitionColumns;
+
+ private final boolean partitionMaterialization;
+
+ private final List<OrderColumn> orderColumns;
+
+ public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
+ List<OrderColumn> orderColumns) {
+ this.partitionColumns = partitionColumns;
+ this.partitionMaterialization = partitionMaterialization;
+ this.orderColumns = orderColumns;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.WINDOW;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
+ IPartitioningProperty pp;
+ switch (op.getExecutionMode()) {
+ case PARTITIONED:
+ pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
+ context.getComputationNodeDomain());
+ break;
+ case UNPARTITIONED:
+ pp = IPartitioningProperty.UNPARTITIONED;
+ break;
+ case LOCAL:
+ pp = null;
+ break;
+ default:
+ throw new IllegalStateException(op.getExecutionMode().name());
+ }
+
+ // require local order property [pc1, ... pcN, oc1, ... ocN]
+ // accounting for cases where there's an overlap between order and partition columns
+ // TODO replace with required local grouping on partition columns + local order on order columns
+ List<OrderColumn> lopColumns = new ArrayList<>();
+ ListSet<LogicalVariable> pcVars = new ListSet<>();
+ pcVars.addAll(partitionColumns);
+ for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+ OrderColumn oc = orderColumns.get(oIdx);
+ LogicalVariable ocVar = oc.getColumn();
+ if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
+ throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
+ op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns));
+ }
+ lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+ }
+ int pIdx = 0;
+ for (LogicalVariable pColumn : pcVars) {
+ lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
+ }
+ List<ILocalStructuralProperty> localProps = Collections.singletonList(new LocalOrderProperty(lopColumns));
+
+ return new PhysicalRequirements(
+ new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ WindowOperator winOp = (WindowOperator) op;
+ int[] outColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables());
+ List<Mutable<ILogicalExpression>> expressions = winOp.getExpressions();
+ IRunningAggregateEvaluatorFactory[] winFuncs = new IRunningAggregateEvaluatorFactory[expressions.size()];
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ for (int i = 0; i < winFuncs.length; i++) {
+ StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) expressions.get(i).getValue();
+ winFuncs[i] = expressionRuntimeProvider.createRunningAggregateFunctionFactory(expr,
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+ }
+
+ // TODO push projections into the operator
+ int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
+
+ int[] partitionColumnList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
+
+ IBinaryComparatorFactory[] partitionComparatorFactories = JobGenHelper
+ .variablesToAscBinaryComparatorFactories(partitionColumns, context.getTypeEnvironment(op), context);
+
+ //TODO not all functions need order comparators
+ IBinaryComparatorFactory[] orderComparatorFactories = JobGenHelper
+ .variablesToBinaryComparatorFactories(orderColumns, context.getTypeEnvironment(op), context);
+
+ WindowRuntimeFactory runtime = new WindowRuntimeFactory(outColumns, winFuncs, projectionList,
+ partitionColumnList, partitionComparatorFactories, partitionMaterialization, orderComparatorFactories);
+ runtime.setSourceLocation(winOp.getSourceLocation());
+
+ // contribute one Asterix framewriter
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ builder.contributeMicroOperator(winOp, runtime, recDesc);
+ // and contribute one edge from its child
+ ILogicalOperator src = winOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, winOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+
+ public boolean isPartitionMaterialization() {
+ return partitionMaterialization;
+ }
+
+ private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
+ for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
+ if (varSet.contains(ocList.get(i).getColumn())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 77f052e..ad45614 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -66,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
@@ -170,13 +171,10 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
@Override
public Void visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("order ");
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
- if (op.getTopK() != -1) {
- buffer.append("(topK: " + op.getTopK() + ") ");
- }
- String fst = getOrderString(p.first);
- buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+ if (op.getTopK() != -1) {
+ buffer.append("(topK: " + op.getTopK() + ") ");
}
+ pprintOrderList(op.getOrderExpressions(), indent);
return null;
}
@@ -484,6 +482,17 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
return null;
}
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Integer indent) throws AlgebricksException {
+ addIndent(indent).append("window ").append(str(op.getVariables())).append(" <- ");
+ pprintExprList(op.getExpressions(), indent);
+ buffer.append(" partition ");
+ pprintExprList(op.getPartitionExpressions(), indent);
+ buffer.append(" order ");
+ pprintOrderList(op.getOrderExpressions(), indent);
+ return null;
+ }
+
protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent) throws AlgebricksException {
boolean first = true;
if (op.getNestedPlans().isEmpty()) {
@@ -537,4 +546,12 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
}
buffer.append("]");
}
+
+ protected void pprintOrderList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderList,
+ Integer indent) throws AlgebricksException {
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderList) {
+ String fst = getOrderString(p.first);
+ buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 4a17cc6..4c810ab 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -70,6 +70,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
@@ -251,30 +252,12 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
@Override
public Void visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("\"operator\": \"order\"");
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
- buffer.append(",\n");
- if (op.getTopK() != -1) {
- addIndent(indent).append("\"topK\": \"" + op.getTopK() + "\",\n");
- }
- String fst = getOrderString(p.first);
- addIndent(indent).append("\"first\": " + fst + ",\n");
- addIndent(indent).append(
- "\"second\": \"" + p.second.getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
- }
+ int topK = op.getTopK();
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions = op.getOrderExpressions();
+ pprintOrderExprList(orderExpressions, topK, indent);
return null;
}
- private String getOrderString(OrderOperator.IOrder first) {
- switch (first.getKind()) {
- case ASC:
- return "\"ASC\"";
- case DESC:
- return "\"DESC\"";
- default:
- return first.getExpressionRef().toString();
- }
- }
-
@Override
public Void visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("\"operator\": \"assign\"");
@@ -667,6 +650,23 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
return null;
}
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Integer indent) throws AlgebricksException {
+ addIndent(indent).append("\"operator\": \"window\"");
+ variablePrintHelper(op.getVariables(), indent);
+ addIndent(0).append(",\n");
+ pprintExprList(op.getExpressions(), indent);
+ if (!op.getPartitionExpressions().isEmpty()) {
+ buffer.append(",\n");
+ addIndent(indent).append("\"partition by\": ");
+ pprintExprList(op.getPartitionExpressions(), indent);
+ }
+ buffer.append(",\n");
+ addIndent(indent).append("\"order by\": ");
+ pprintOrderExprList(op.getOrderExpressions(), -1, indent);
+ return null;
+ }
+
protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent) throws AlgebricksException {
idCounter.nextPrefix();
buffer.append("[\n");
@@ -718,4 +718,29 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
}
buffer.append("]");
}
+
+ private void pprintOrderExprList(List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+ int topK, Integer indent) throws AlgebricksException {
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) {
+ buffer.append(",\n");
+ if (topK != -1) {
+ addIndent(indent).append("\"topK\": \"" + topK + "\",\n");
+ }
+ String fst = getOrderString(p.first);
+ addIndent(indent).append("\"first\": " + fst + ",\n");
+ addIndent(indent).append(
+ "\"second\": \"" + p.second.getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
+ }
+ }
+
+ private String getOrderString(OrderOperator.IOrder first) {
+ switch (first.getKind()) {
+ case ASC:
+ return "\"ASC\"";
+ case DESC:
+ return "\"DESC\"";
+ default:
+ return first.getExpressionRef().toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 67199b9..ea914fa 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
@@ -134,6 +135,13 @@ public class OperatorManipulationUtil {
forceUnpartitioned = true;
}
}
+ if (op.getOperatorTag() == LogicalOperatorTag.WINDOW) {
+ WindowOperator winOp = (WindowOperator) op;
+ if (winOp.getPartitionExpressions().isEmpty()) {
+ op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+ forceUnpartitioned = true;
+ }
+ }
for (Mutable<ILogicalOperator> i : op.getInputs()) {
boolean exit = false;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 548a29f..9d5cdeb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -52,6 +52,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
@@ -126,4 +127,6 @@ public interface ILogicalOperatorVisitor<R, T> {
public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
public R visitForwardOperator(ForwardOperator op, T arg) throws AlgebricksException;
+
+ public R visitWindowOperator(WindowOperator op, T arg) throws AlgebricksException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
index b204bcb..5142ce7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -25,6 +25,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
@@ -144,6 +146,20 @@ public final class JobGenHelper {
return compFactories;
}
+ public static IBinaryComparatorFactory[] variablesToBinaryComparatorFactories(Collection<OrderColumn> orderColumns,
+ IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+ IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[orderColumns.size()];
+ IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+ int i = 0;
+ for (OrderColumn oc : orderColumns) {
+ LogicalVariable v = oc.getColumn();
+ boolean ascending = oc.getOrder() == OrderOperator.IOrder.OrderKind.ASC;
+ Object type = env.getVarType(v);
+ compFactories[i++] = bcfProvider.getBinaryComparatorFactory(type, ascending);
+ }
+ return compFactories;
+ }
+
public static INormalizedKeyComputerFactory variablesToAscNormalizedKeyComputerFactory(
Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
throws AlgebricksException {
@@ -181,12 +197,20 @@ public final class JobGenHelper {
}
public static int[] projectAllVariables(IOperatorSchema opSchema) {
- int[] projectionList = new int[opSchema.getSize()];
+ return projectVariablesImpl(opSchema, opSchema, opSchema.getSize());
+ }
+
+ public static int[] projectVariables(IOperatorSchema opSchema, List<LogicalVariable> variables) {
+ return projectVariablesImpl(opSchema, variables, variables.size());
+ }
+
+ private static int[] projectVariablesImpl(IOperatorSchema opSchema, Iterable<LogicalVariable> variables,
+ int variableCount) {
+ int[] projectionList = new int[variableCount];
int k = 0;
- for (LogicalVariable v : opSchema) {
+ for (LogicalVariable v : variables) {
projectionList[k++] = opSchema.findVariable(v);
}
return projectionList;
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 113d205..f10c3a4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
@@ -192,17 +193,7 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
stringBuilder.append("(topK: ").append(op.getTopK()).append(") ");
}
stringBuilder.append("(");
- switch (p.first.getKind()) {
- case ASC:
- stringBuilder.append("ASC");
- break;
- case DESC:
- stringBuilder.append("DESC");
- break;
- default:
- final Mutable<ILogicalExpression> expressionRef = p.first.getExpressionRef();
- stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString());
- }
+ appendOrder(p.first);
stringBuilder.append(", ").append(p.second.getValue().toString()).append(") ");
}
appendSchema(op, showDetails);
@@ -211,6 +202,20 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
return stringBuilder.toString();
}
+ private void appendOrder(OrderOperator.IOrder order) {
+ switch (order.getKind()) {
+ case ASC:
+ stringBuilder.append("ASC");
+ break;
+ case DESC:
+ stringBuilder.append("DESC");
+ break;
+ default:
+ final Mutable<ILogicalExpression> expressionRef = order.getExpressionRef();
+ stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString());
+ }
+ }
+
@Override
public String visitAssignOperator(AssignOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
@@ -595,6 +600,26 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
return stringBuilder.toString();
}
+ @Override
+ public String visitWindowOperator(WindowOperator op, Boolean showDetails) throws AlgebricksException {
+ stringBuilder.setLength(0);
+ stringBuilder.append("window (").append(str(op.getVariables())).append(" <- ");
+ printExprList(op.getExpressions());
+ stringBuilder.append(") partition by (");
+ printExprList(op.getPartitionExpressions());
+ stringBuilder.append(") order by (");
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
+ stringBuilder.append("(");
+ appendOrder(p.first);
+ stringBuilder.append(", ").append(p.second.getValue().toString()).append(") ");
+ }
+ stringBuilder.append(")");
+ appendSchema(op, showDetails);
+ appendAnnotations(op, showDetails);
+ appendPhysicalOperatorInfo(op, showDetails);
+ return stringBuilder.toString();
+ }
+
private void printExprList(List<Mutable<ILogicalExpression>> expressions) {
stringBuilder.append("[");
expressions.forEach(exprRef -> stringBuilder.append(exprRef.getValue().toString()).append(", "));
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
index 036b3e1..7adc732 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
@@ -18,26 +18,30 @@
*/
package org.apache.hyracks.algebricks.rewriter.rules;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public abstract class AbstractExtractExprRule implements IAlgebraicRewriteRule {
- protected LogicalVariable extractExprIntoAssignOpRef(ILogicalExpression gExpr, Mutable<ILogicalOperator> opRef2,
- IOptimizationContext context) throws AlgebricksException {
+ protected static LogicalVariable extractExprIntoAssignOpRef(ILogicalExpression gExpr,
+ Mutable<ILogicalOperator> opRef2, IOptimizationContext context) throws AlgebricksException {
LogicalVariable v = context.newVar();
- AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(gExpr));
+ AssignOperator a = new AssignOperator(v, new MutableObject<>(gExpr));
a.setSourceLocation(gExpr.getSourceLocation());
- a.getInputs().add(new MutableObject<ILogicalOperator>(opRef2.getValue()));
+ a.getInputs().add(new MutableObject<>(opRef2.getValue()));
opRef2.setValue(a);
if (gExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
context.addNotToBeInlinedVar(v);
@@ -46,4 +50,37 @@ public abstract class AbstractExtractExprRule implements IAlgebraicRewriteRule {
return v;
}
+ protected static <T> boolean extractComplexExpressions(ILogicalOperator op, List<T> exprList,
+ Function<T, Mutable<ILogicalExpression>> exprGetter, Predicate<ILogicalExpression> retainPredicate,
+ IOptimizationContext context) throws AlgebricksException {
+ if (!hasComplexExpressions(exprList, exprGetter)) {
+ return false;
+ }
+ boolean rewritten = false;
+ Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(0);
+ for (T item : exprList) {
+ Mutable<ILogicalExpression> exprMutable = exprGetter.apply(item);
+ ILogicalExpression expr = exprMutable.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE && !retainPredicate.test(expr)) {
+ LogicalVariable v = extractExprIntoAssignOpRef(expr, inputOpRef, context);
+ VariableReferenceExpression vRef = new VariableReferenceExpression(v);
+ vRef.setSourceLocation(expr.getSourceLocation());
+ exprMutable.setValue(vRef);
+ rewritten = true;
+ }
+ }
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ return rewritten;
+ }
+
+ private static <T> boolean hasComplexExpressions(List<T> exprList,
+ Function<T, Mutable<ILogicalExpression>> exprGetter) {
+ for (T item : exprList) {
+ Mutable<ILogicalExpression> exprMutable = exprGetter.apply(item);
+ if (exprMutable.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index cdab2f4..52b3f59 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -515,7 +515,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
case LOCAL_GROUPING_PROPERTY: {
LocalGroupingProperty g = (LocalGroupingProperty) prop;
Collection<LogicalVariable> vars =
- (g.getPreferredOrderEnforcer() != null) ? g.getPreferredOrderEnforcer() : g.getColumnSet();
+ !g.getPreferredOrderEnforcer().isEmpty() ? g.getPreferredOrderEnforcer() : g.getColumnSet();
List<OrderColumn> orderColumns = new ArrayList<>();
for (LogicalVariable v : vars) {
OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
index eb2bee6..06b2e16 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
@@ -18,17 +18,17 @@
*/
package org.apache.hyracks.algebricks.rewriter.rules;
-import org.apache.commons.lang3.mutable.Mutable;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
@@ -55,8 +55,8 @@ public class ExtractGbyExpressionsRule extends AbstractExtractExprRule {
}
context.addToDontApplySet(this, op1);
GroupByOperator g = (GroupByOperator) op1;
- boolean r1 = gbyExprWasRewritten(g, context);
- boolean r2 = decorExprWasRewritten(g, context);
+ boolean r1 = extractComplexExpressions(g, g.getGroupByList(), context);
+ boolean r2 = extractComplexExpressions(g, g.getDecorList(), context);
boolean fired = r1 || r2;
if (fired) {
context.computeAndSetTypeEnvironmentForOperator(g);
@@ -64,56 +64,15 @@ public class ExtractGbyExpressionsRule extends AbstractExtractExprRule {
return fired;
}
- private boolean gbyExprWasRewritten(GroupByOperator g, IOptimizationContext context) throws AlgebricksException {
- if (!gbyHasComplexExpr(g)) {
- return false;
- }
- Mutable<ILogicalOperator> opRef2 = g.getInputs().get(0);
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getGroupByList()) {
- ILogicalExpression expr = gbyPair.second.getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context);
- VariableReferenceExpression vRef = new VariableReferenceExpression(v);
- vRef.setSourceLocation(expr.getSourceLocation());
- gbyPair.second.setValue(vRef);
- }
- }
- return true;
- }
-
- private boolean decorExprWasRewritten(GroupByOperator g, IOptimizationContext context) throws AlgebricksException {
- if (!decorHasComplexExpr(g)) {
- return false;
- }
- Mutable<ILogicalOperator> opRef2 = g.getInputs().get(0);
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorPair : g.getDecorList()) {
- ILogicalExpression expr = decorPair.second.getValue();
- if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context);
- VariableReferenceExpression vRef = new VariableReferenceExpression(v);
- vRef.setSourceLocation(expr.getSourceLocation());
- decorPair.second.setValue(vRef);
- }
- }
- return true;
- }
-
- private boolean gbyHasComplexExpr(GroupByOperator g) {
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getGroupByList()) {
- if (gbyPair.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- return true;
- }
- }
- return false;
+ private static boolean extractComplexExpressions(ILogicalOperator op,
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> exprList, IOptimizationContext context)
+ throws AlgebricksException {
+ return extractComplexExpressions(op, exprList, Pair::getSecond, context);
}
- private boolean decorHasComplexExpr(GroupByOperator g) {
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyPair : g.getDecorList()) {
- if (gbyPair.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- return true;
- }
- }
- return false;
+ public static <T> boolean extractComplexExpressions(ILogicalOperator op, List<T> exprList,
+ Function<T, Mutable<ILogicalExpression>> exprGetter, IOptimizationContext context)
+ throws AlgebricksException {
+ return extractComplexExpressions(op, exprList, exprGetter, t -> false, context);
}
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
index aa58985..e6f86be 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushNestedOrderByUnderPreSortedGroupByRule.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.rewriter.rules;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
@@ -39,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -82,6 +84,10 @@ public class PushNestedOrderByUnderPreSortedGroupByRule implements IAlgebraicRew
if (!isIndependentFromChildren(order1)) {
return false;
}
+ if (OperatorManipulationUtil.ancestorOfOperators(order1.getInputs().get(0).getValue(),
+ EnumSet.of(LogicalOperatorTag.ORDER))) {
+ return false;
+ }
AbstractPhysicalOperator pOrder1 = (AbstractPhysicalOperator) op2.getPhysicalOperator();
if (pOrder1.getOperatorTag() != PhysicalOperatorTag.STABLE_SORT
&& pOrder1.getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 6b09894..1388ccb 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -60,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
@@ -256,6 +257,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
return visit(op);
}
+ @Override
+ public ILogicalOperator visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ return visit(op);
+ }
+
private ILogicalOperator visit(ILogicalOperator op) throws AlgebricksException {
List<Map<LogicalVariable, LogicalVariable>> varMapSnapshots = new ArrayList<>();
for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index bcc537a..c8f7cbf 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -73,6 +73,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java
new file mode 100644
index 0000000..b4030f7
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IWindowAggregateEvaluator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.base;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IWindowAggregateEvaluator extends IRunningAggregateEvaluator {
+ default void configure(IBinaryComparator[] orderComparators) {
+ }
+
+ void initPartition(long partitionLength) throws HyracksDataException;
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fdedf626/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
new file mode 100644
index 0000000..27354cb
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.aggrun;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public abstract class AbstractRunningAggregatePushRuntime<T extends IRunningAggregateEvaluator>
+ extends AbstractOneInputOneOutputOneFramePushRuntime {
+ protected final IHyracksTaskContext ctx;
+ private final IRunningAggregateEvaluatorFactory[] aggFactories;
+ private final Class<T> aggEvalClass;
+ protected final List<T> aggEvals;
+ private final int[] projectionList;
+ private final int[] projectionToOutColumns;
+ private final IPointable p = VoidPointable.FACTORY.createPointable();
+ private final ArrayTupleBuilder tupleBuilder;
+ private boolean first;
+
+ public AbstractRunningAggregatePushRuntime(int[] outColumns, IRunningAggregateEvaluatorFactory[] aggFactories,
+ int[] projectionList, IHyracksTaskContext ctx, Class<T> aggEvalClass) {
+ this.ctx = ctx;
+ this.projectionList = projectionList;
+ this.aggFactories = aggFactories;
+ this.aggEvalClass = aggEvalClass;
+ aggEvals = new ArrayList<>(aggFactories.length);
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ projectionToOutColumns = new int[projectionList.length];
+
+ for (int j = 0; j < projectionList.length; j++) {
+ projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ }
+ first = true;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ if (first) {
+ first = false;
+ init();
+ }
+ for (T aggEval : aggEvals) {
+ aggEval.init();
+ }
+ }
+
+ protected void init() throws HyracksDataException {
+ initAccessAppendRef(ctx);
+ for (IRunningAggregateEvaluatorFactory aggFactory : aggFactories) {
+ IRunningAggregateEvaluator aggEval = aggFactory.createRunningAggregateEvaluator(ctx);
+ aggEvals.add(aggEvalClass.cast(aggEval));
+ }
+ }
+
+ protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx) throws HyracksDataException {
+ for (int t = beginIdx; t <= endIdx; t++) {
+ tRef.reset(accessor, t);
+ produceTuple(tupleBuilder, accessor, t, tRef);
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+
+ private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ FrameTupleReference tupleRef) throws HyracksDataException {
+ tb.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int k = projectionToOutColumns[f];
+ if (k >= 0) {
+ aggEvals.get(k).step(tupleRef, p);
+ tb.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } else {
+ tb.addField(accessor, tIndex, projectionList[f]);
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
+}
\ No newline at end of file