You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2019/05/01 19:27:49 UTC
[asterixdb] branch master updated: [NO ISSUE][COMP] Refactor
physical window operator
This is an automated email from the ASF dual-hosted git repository.
dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1d75da1 [NO ISSUE][COMP] Refactor physical window operator
1d75da1 is described below
commit 1d75da1b93e1dd8ab7851d65b23f5b4644dcc74f
Author: Dmitry Lychagin <dm...@couchbase.com>
AuthorDate: Tue Apr 30 13:29:22 2019 -0700
[NO ISSUE][COMP] Refactor physical window operator
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Create a new physical operator (WindowStreamPOperator)
for window operators that do not require partition materialization
- Create AbstractWindowPOperator which is now a base
class for both physical window operators
- Rename WindowSimpleRuntime* to WindowStreamRuntime*
Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3369
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
.../rules/SetAsterixPhysicalOperatorsRule.java | 33 ++-
.../app/resource/OperatorResourcesComputer.java | 5 +-
.../app/resource/RequiredCapacityVisitor.java | 2 +-
.../results/window/win_misc/win_misc_02.plan | 2 +-
.../results/window/win_opt_01/win_opt_01_6.plan | 2 +-
.../results/window/win_opt_01/win_opt_01_7.plan | 2 +-
.../core/algebra/base/PhysicalOperatorTag.java | 3 +-
...POperator.java => AbstractWindowPOperator.java} | 108 ++-----
.../operators/physical/WindowPOperator.java | 328 +++------------------
.../operators/physical/WindowStreamPOperator.java | 62 ++++
.../rules/SetAlgebricksPhysicalOperatorsRule.java | 5 +-
...shRuntime.java => WindowStreamPushRuntime.java} | 4 +-
...actory.java => WindowStreamRuntimeFactory.java} | 13 +-
13 files changed, 176 insertions(+), 393 deletions(-)
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index b26eaca..69eecfd 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -56,8 +56,10 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowStreamPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
@@ -241,19 +243,24 @@ public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysical
}
@Override
- public WindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
- boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp,
- BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
- boolean frameStartIsMonotonic = AnalysisUtil
- .isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
- boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
- winOp.getFrameValueExpressions());
- boolean nestedTrivialAggregates = winOp.hasNestedPlans()
- && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
-
- return new WindowPOperator(winOp.getPartitionVarList(), partitionMaterialization,
- winOp.getOrderColumnList(), frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
- context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ public AbstractWindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException {
+ if (winOp.hasNestedPlans()) {
+ boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(
+ winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions());
+ boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(
+ winOp.getFrameEndExpressions(), winOp.getFrameValueExpressions());
+ boolean nestedTrivialAggregates =
+ winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+ return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(),
+ frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
+ context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ } else if (AnalysisUtil.hasFunctionWithProperty(winOp,
+ BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
+ return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false, false,
+ context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ } else {
+ return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList());
+ }
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 6cba1b1..a2c1c33 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -25,7 +25,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogi
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
public class OperatorResourcesComputer {
@@ -146,10 +145,10 @@ public class OperatorResourcesComputer {
}
private long getWindowRequiredMemory(WindowOperator op) {
- WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
// memory budget configuration only applies to window operators that materialize partitions (non-streaming)
// streaming window operators only need 2 frames: output + (conservative estimate) last frame partition columns
- long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize;
+ long memorySize = op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
+ : windowMemorySize;
return getOperatorRequiredMemory(op, memorySize);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
index c0fca94..024a13e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -318,7 +318,7 @@ public class RequiredCapacityVisitor implements ILogicalOperatorVisitor<Void, Vo
WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
visitInternal(op, true);
addOutputBuffer(op); // + previous frame
- if (physOp.isPartitionMaterialization()) {
+ if (physOp.getOperatorTag() == PhysicalOperatorTag.WINDOW) {
addOutputBuffer(op); // + run frame
}
return null;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
index e452d03..c12faf5 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
@@ -11,7 +11,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- WINDOW |PARTITIONED|
+ -- WINDOW_STREAM |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$34(ASC), $$48(ASC)] |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
index ab78ecc..a1e04ad 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |LOCAL|
-- ASSIGN |LOCAL|
-- WINDOW |LOCAL|
- -- WINDOW |LOCAL|
+ -- WINDOW_STREAM |LOCAL|
-- ONE_TO_ONE_EXCHANGE |LOCAL|
-- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
index 5b3d480..b111336 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
@@ -8,7 +8,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- WINDOW |LOCAL|
+ -- WINDOW_STREAM |LOCAL|
-- ONE_TO_ONE_EXCHANGE |LOCAL|
-- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 8e1f77f..84d19c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -78,5 +78,6 @@ public enum PhysicalOperatorTag {
UPDATE,
WRITE_RESULT,
INTERSECT,
- WINDOW
+ WINDOW,
+ WINDOW_STREAM
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
similarity index 72%
copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
copy to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index 8bd4610..7065b70 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -34,7 +34,6 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
@@ -59,47 +58,19 @@ import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFact
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowMaterializingRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.ErrorCode;
-public class WindowPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractWindowPOperator extends AbstractPhysicalOperator {
private final List<LogicalVariable> partitionColumns;
- private final boolean partitionMaterialization;
-
private final List<OrderColumn> orderColumns;
- private final boolean frameStartIsMonotonic;
-
- private final boolean frameEndIsMonotonic;
-
- private final boolean nestedTrivialAggregates;
-
- // The maximum number of in-memory frames that this operator can use.
- private final int memSizeInFrames;
-
- public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
- List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
- boolean nestedTrivialAggregates, int memSizeInFrames) {
+ AbstractWindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
this.partitionColumns = partitionColumns;
- this.partitionMaterialization = partitionMaterialization;
this.orderColumns = orderColumns;
- this.frameStartIsMonotonic = frameStartIsMonotonic;
- this.frameEndIsMonotonic = frameEndIsMonotonic;
- this.nestedTrivialAggregates = nestedTrivialAggregates;
- this.memSizeInFrames = memSizeInFrames;
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.WINDOW;
}
@Override
@@ -222,58 +193,22 @@ public class WindowPOperator extends AbstractPhysicalOperator {
inputSchemas, context);
}
- AbstractWindowRuntimeFactory runtime = null;
+ int nestedAggOutSchemaSize = 0;
+ WindowAggregatorDescriptorFactory nestedAggFactory = null;
if (winOp.hasNestedPlans()) {
int opSchemaSizePreSubplans = opSchema.getSize();
AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context);
- int aggregatorOutputSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
- WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
+ nestedAggOutSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
+ nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
-
- int frameMaxObjects = winOp.getFrameMaxObjects();
-
- // special cases
- if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() && frameOffsetExpr == null) {
- if (frameEndExprList.isEmpty()) {
- // special case #1: frame == whole partition, no exclusions, no offset
- runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories, orderComparatorFactories, frameMaxObjects,
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
- } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
- // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
- // trivial aggregate subplan ( aggregate + nts )
- nestedAggFactory.setPartialOutputEnabled(true);
- runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories, orderComparatorFactories,
- frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
- frameEndExprEvals, frameEndValidationExprEvals, frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
- runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory,
- memSizeInFrames);
- }
- }
- // default case
- if (runtime == null) {
- runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
- frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
- frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
- frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(), context.getBinaryIntegerInspectorFactory(),
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
- }
- } else if (partitionMaterialization) {
- runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
- runningAggFactories, memSizeInFrames);
- } else {
- runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
- runningAggFactories);
}
+
+ AbstractWindowRuntimeFactory runtime = createRuntimeFactory(winOp, partitionColumnsList,
+ partitionComparatorFactories, orderComparatorFactories, frameValueExprEvalsAndComparators.first,
+ frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
+ frameEndExprEvals, frameEndValidationExprEvals, frameExcludeExprEvalsAndComparators.first,
+ frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, context);
runtime.setSourceLocation(winOp.getSourceLocation());
// contribute one Asterix framewriter
@@ -284,6 +219,17 @@ public class WindowPOperator extends AbstractPhysicalOperator {
builder.contributeGraphEdge(src, 0, winOp, 0);
}
+ protected abstract AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp,
+ int[] partitionColumnsList, IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context);
+
@Override
public boolean isMicroOperator() {
return true;
@@ -294,10 +240,6 @@ public class WindowPOperator extends AbstractPhysicalOperator {
return true;
}
- public boolean isPartitionMaterialization() {
- return partitionMaterialization;
- }
-
private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv,
IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException {
@@ -336,7 +278,7 @@ public class WindowPOperator extends AbstractPhysicalOperator {
return new Pair<>(evals, comparators);
}
- private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
+ private static boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
if (varSet.contains(ocList.get(i).getColumn())) {
return true;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 8bd4610..23853e8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -19,42 +19,13 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -63,18 +34,9 @@ import org.apache.hyracks.algebricks.runtime.operators.win.WindowMaterializingRu
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-public class WindowPOperator extends AbstractPhysicalOperator {
-
- private final List<LogicalVariable> partitionColumns;
-
- private final boolean partitionMaterialization;
-
- private final List<OrderColumn> orderColumns;
+public final class WindowPOperator extends AbstractWindowPOperator {
private final boolean frameStartIsMonotonic;
@@ -85,12 +47,10 @@ public class WindowPOperator extends AbstractPhysicalOperator {
// The maximum number of in-memory frames that this operator can use.
private final int memSizeInFrames;
- public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization,
- List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
- boolean nestedTrivialAggregates, int memSizeInFrames) {
- this.partitionColumns = partitionColumns;
- this.partitionMaterialization = partitionMaterialization;
- this.orderColumns = orderColumns;
+ public WindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns,
+ boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates,
+ int memSizeInFrames) {
+ super(partitionColumns, orderColumns);
this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndIsMonotonic = frameEndIsMonotonic;
this.nestedTrivialAggregates = nestedTrivialAggregates;
@@ -103,245 +63,55 @@ public class WindowPOperator extends AbstractPhysicalOperator {
}
@Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
- IPartitioningProperty pp;
- switch (op.getExecutionMode()) {
- case PARTITIONED:
- pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns),
- context.getComputationNodeDomain());
- break;
- case UNPARTITIONED:
- pp = IPartitioningProperty.UNPARTITIONED;
- break;
- case LOCAL:
- pp = null;
- break;
- default:
- throw new IllegalStateException(op.getExecutionMode().name());
- }
-
- // require local order property [pc1, ... pcN, oc1, ... ocN]
- // accounting for cases where there's an overlap between order and partition columns
- // TODO replace with required local grouping on partition columns + local order on order columns
- List<OrderColumn> lopColumns = new ArrayList<>();
- ListSet<LogicalVariable> pcVars = new ListSet<>();
- pcVars.addAll(partitionColumns);
- for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
- OrderColumn oc = orderColumns.get(oIdx);
- LogicalVariable ocVar = oc.getColumn();
- if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
- throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC,
- op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns));
- }
- lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
- }
- int pIdx = 0;
- for (LogicalVariable pColumn : pcVars) {
- lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
- }
- List<ILocalStructuralProperty> localProps =
- lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns));
-
- return new PhysicalRequirements(
- new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
- IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- WindowOperator winOp = (WindowOperator) op;
-
- int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
-
- IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
- IBinaryComparatorFactory[] partitionComparatorFactories =
- JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context);
-
- //TODO not all functions need order comparators
- IBinaryComparatorFactory[] orderComparatorFactories =
- JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context);
-
- IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
- IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider();
- IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider();
-
- List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions();
- IScalarEvaluatorFactory[] frameStartExprEvals =
- createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions();
- IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions();
- IScalarEvaluatorFactory[] frameEndExprEvals =
- createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions();
- IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList =
- winOp.getFrameValueExpressions();
- Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas,
- inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
-
- List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions();
- Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context);
-
- IScalarEvaluatorFactory frameOffsetExprEval = null;
- ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
- if (frameOffsetExpr != null) {
- frameOffsetExprEval =
- exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context);
- }
-
- int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema);
-
- int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables());
-
- List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions();
- int runningAggExprCount = runningAggExprs.size();
- IRunningAggregateEvaluatorFactory[] runningAggFactories =
- new IRunningAggregateEvaluatorFactory[runningAggExprCount];
- for (int i = 0; i < runningAggExprCount; i++) {
- StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
- runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
- inputSchemas, context);
- }
-
- AbstractWindowRuntimeFactory runtime = null;
- if (winOp.hasNestedPlans()) {
- int opSchemaSizePreSubplans = opSchema.getSize();
- AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context);
- int aggregatorOutputSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans;
- WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
- nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
-
- int frameMaxObjects = winOp.getFrameMaxObjects();
-
- // special cases
- if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() && frameOffsetExpr == null) {
- if (frameEndExprList.isEmpty()) {
- // special case #1: frame == whole partition, no exclusions, no offset
- runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories, orderComparatorFactories, frameMaxObjects,
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
- } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
- // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
- // trivial aggregate subplan ( aggregate + nts )
- nestedAggFactory.setPartialOutputEnabled(true);
- runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories, orderComparatorFactories,
- frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second,
- frameEndExprEvals, frameEndValidationExprEvals, frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
- runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory,
- memSizeInFrames);
- }
- }
- // default case
- if (runtime == null) {
- runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals,
- frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
- frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(),
- frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(), context.getBinaryIntegerInspectorFactory(),
- projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames);
- }
- } else if (partitionMaterialization) {
- runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
+
+ // special cases
+ if (!winOp.hasNestedPlans()) {
+ return new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
runningAggFactories, memSizeInFrames);
- } else {
- runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
- orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
- runningAggFactories);
- }
- runtime.setSourceLocation(winOp.getSourceLocation());
-
- // contribute one Asterix framewriter
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context);
- builder.contributeMicroOperator(winOp, runtime, recDesc);
- // and contribute one edge from its child
- ILogicalOperator src = winOp.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, winOp, 0);
- }
-
- @Override
- public boolean isMicroOperator() {
- return true;
- }
-
- @Override
- public boolean expensiveThanMaterialization() {
- return true;
- }
-
- public boolean isPartitionMaterialization() {
- return partitionMaterialization;
- }
-
- private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv,
- IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException {
- if (exprList.isEmpty()) {
- return null;
}
- int ln = exprList.size();
- IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
- for (int i = 0; i < ln; i++) {
- ILogicalExpression expr = exprList.get(i).getValue();
- evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
- }
- return evals;
- }
- private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories(
- List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter,
- Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider,
- IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context)
- throws AlgebricksException {
- if (exprList.isEmpty()) {
- return new Pair<>(null, null);
- }
- int ln = exprList.size();
- IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
- IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln];
- for (int i = 0; i < ln; i++) {
- T exprObj = exprList.get(i);
- ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
- OrderOperator.IOrder order = orderGetter.apply(exprObj);
- evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context);
- comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
- order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
- }
- return new Pair<>(evals, comparators);
- }
-
- private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) {
- for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
- if (varSet.contains(ocList.get(i).getColumn())) {
- return true;
+ boolean hasFrameStart = frameStartExprEvals != null && frameStartExprEvals.length > 0;
+ boolean hasFrameEnd = frameEndExprEvals != null && frameEndExprEvals.length > 0;
+ boolean hasFrameExclude = frameExcludeExprEvals != null && frameExcludeExprEvals.length > 0;
+ boolean hasFrameOffset = frameOffsetExprEval != null;
+ if (!hasFrameStart && !hasFrameExclude && !hasFrameOffset) {
+ if (!hasFrameEnd) {
+ // special case #1: frame == whole partition, no exclusions, no offset
+ return new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, winOp.getFrameMaxObjects(), projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
+ memSizeInFrames);
+ } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
+ // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset,
+ // trivial aggregate subplan ( aggregate + nts )
+ nestedAggFactory.setPartialOutputEnabled(true);
+ return new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameEndExprEvals,
+ frameEndValidationExprEvals, winOp.getFrameMaxObjects(),
+ context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
+ memSizeInFrames);
}
}
- return false;
+
+ // default case
+ return new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameStartExprEvals,
+ frameStartValidationExprEvals, frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals,
+ frameExcludeExprEvals, winOp.getFrameExcludeNegationStartIdx(), frameExcludeComparatorFactories,
+ frameOffsetExprEval, winOp.getFrameMaxObjects(), context.getBinaryBooleanInspectorFactory(),
+ context.getBinaryIntegerInspectorFactory(), projectionColumnsExcludingSubplans, runningAggOutColumns,
+ runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, memSizeInFrames);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
new file mode 100644
index 0000000..33b47ec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.win.WindowStreamRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public final class WindowStreamPOperator extends AbstractWindowPOperator {
+
+ public WindowStreamPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
+ super(partitionColumns, orderColumns);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.WINDOW_STREAM;
+ }
+
+ @Override
+ protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
+ return new WindowStreamRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
+ orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns,
+ runningAggFactories);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f127898..e6cdc28 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -79,6 +79,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator;
@@ -468,8 +469,8 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
return createWindowPOperator(op);
}
- protected WindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
- return new WindowPOperator(op.getPartitionVarList(), true, op.getOrderColumnList(), false, false, false,
+ protected AbstractWindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
+ return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false,
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
similarity index 94%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
index f7f1a25..d23d4e7 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
@@ -30,9 +30,9 @@ import org.apache.hyracks.api.exceptions.SourceLocation;
/**
* Runtime for window operators that evaluates running aggregates without partition materialization.
*/
-class WindowSimplePushRuntime extends AbstractWindowPushRuntime {
+class WindowStreamPushRuntime extends AbstractWindowPushRuntime {
- WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ WindowStreamPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx,
SourceLocation sourceLoc) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
similarity index 82%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
index 2d1cdde..be368a9 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
@@ -27,13 +27,14 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
/**
- * Runtime factory for window operators that evaluates running aggregates without partition materialization.
+ * Runtime factory for window operators that evaluates running aggregates in a streaming fashion
+ * (without partition materialization).
*/
-public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory {
+public class WindowStreamRuntimeFactory extends AbstractWindowRuntimeFactory {
private static final long serialVersionUID = 1L;
- public WindowSimpleRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+ public WindowStreamRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) {
super(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
@@ -42,13 +43,13 @@ public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory {
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
- return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
+ return new WindowStreamPushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories,
projectionList, runningAggOutColumns, runningAggFactories, ctx, sourceLoc);
}
@Override
public String toString() {
- return "window (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns) + " := "
- + Arrays.toString(runningAggFactories);
+ return "window-stream (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns)
+ + " := " + Arrays.toString(runningAggFactories);
}
}