You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:15:05 UTC
[48/61] [partial] incubator-impala git commit: IMPALA-3786: Replace
"cloudera" with "apache" (part 1)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java
deleted file mode 100644
index 9abd82d..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticExpr.java
+++ /dev/null
@@ -1,839 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AnalyticWindow.Boundary;
-import com.cloudera.impala.analysis.AnalyticWindow.BoundaryType;
-import com.cloudera.impala.catalog.AggregateFunction;
-import com.cloudera.impala.catalog.Function;
-import com.cloudera.impala.catalog.ScalarType;
-import com.cloudera.impala.catalog.Type;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.common.TreeNode;
-import com.cloudera.impala.service.FeSupport;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TExprNode;
-import com.cloudera.impala.util.TColumnValueUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Representation of an analytic function call with OVER clause.
- * All "subexpressions" (such as the actual function call parameters as well as the
- * partition/ordering exprs, etc.) are embedded as children in order to allow expr
- * substitution:
- * function call params: child 0 .. #params
- * partition exprs: children #params + 1 .. #params + #partition-exprs
- * ordering exprs:
- * children #params + #partition-exprs + 1 ..
- * #params + #partition-exprs + #order-by-elements
- * exprs in windowing clause: remaining children
- *
- * Note that it's wrong to embed the FunctionCallExpr itself as a child,
- * because in 'COUNT(..) OVER (..)' the 'COUNT(..)' is not part of a standard aggregate
- * computation and must not be substituted as such. However, the parameters of the
- * analytic function call might reference the output of an aggregate computation
- * and need to be substituted as such; example: COUNT(COUNT(..)) OVER (..)
- */
-public class AnalyticExpr extends Expr {
- private final static Logger LOG = LoggerFactory.getLogger(AnalyticExpr.class);
-
- private FunctionCallExpr fnCall_;
- private final List<Expr> partitionExprs_;
- // These elements are modified to point to the corresponding child exprs to keep them
- // in sync through expr substitutions.
- private List<OrderByElement> orderByElements_ = Lists.newArrayList();
- private AnalyticWindow window_;
-
- // If set, requires the window to be set to null in resetAnalysisState(). Required for
- // proper substitution/cloning because standardization may set a window that is illegal
- // in SQL, and hence, will fail analysis().
- private boolean resetWindow_ = false;
-
- // SQL string of this AnalyticExpr before standardization. Returned in toSqlImpl().
- private String sqlString_;
-
- private static String LEAD = "lead";
- private static String LAG = "lag";
- private static String FIRST_VALUE = "first_value";
- private static String LAST_VALUE = "last_value";
- private static String FIRST_VALUE_IGNORE_NULLS = "first_value_ignore_nulls";
- private static String LAST_VALUE_IGNORE_NULLS = "last_value_ignore_nulls";
- private static String RANK = "rank";
- private static String DENSERANK = "dense_rank";
- private static String ROWNUMBER = "row_number";
- private static String MIN = "min";
- private static String MAX = "max";
- private static String PERCENT_RANK = "percent_rank";
- private static String CUME_DIST = "cume_dist";
- private static String NTILE = "ntile";
-
- // Internal function used to implement FIRST_VALUE with a window rewrite and
- // additional null handling in the backend.
- public static String FIRST_VALUE_REWRITE = "first_value_rewrite";
-
- public AnalyticExpr(FunctionCallExpr fnCall, List<Expr> partitionExprs,
- List<OrderByElement> orderByElements, AnalyticWindow window) {
- Preconditions.checkNotNull(fnCall);
- fnCall_ = fnCall;
- partitionExprs_ = partitionExprs != null ? partitionExprs : new ArrayList<Expr>();
- if (orderByElements != null) orderByElements_.addAll(orderByElements);
- window_ = window;
- setChildren();
- }
-
- /**
- * clone() c'tor
- */
- protected AnalyticExpr(AnalyticExpr other) {
- super(other);
- fnCall_ = (FunctionCallExpr) other.fnCall_.clone();
- for (OrderByElement e: other.orderByElements_) {
- orderByElements_.add(e.clone());
- }
- partitionExprs_ = Expr.cloneList(other.partitionExprs_);
- window_ = (other.window_ != null ? other.window_.clone() : null);
- resetWindow_ = other.resetWindow_;
- sqlString_ = other.sqlString_;
- setChildren();
- }
-
- public FunctionCallExpr getFnCall() { return fnCall_; }
- public List<Expr> getPartitionExprs() { return partitionExprs_; }
- public List<OrderByElement> getOrderByElements() { return orderByElements_; }
- public AnalyticWindow getWindow() { return window_; }
-
- @Override
- public boolean equals(Object obj) {
- if (!super.equals(obj)) return false;
- AnalyticExpr o = (AnalyticExpr)obj;
- if (!fnCall_.equals(o.getFnCall())) return false;
- if ((window_ == null) != (o.window_ == null)) return false;
- if (window_ != null) {
- if (!window_.equals(o.window_)) return false;
- }
- return orderByElements_.equals(o.orderByElements_);
- }
-
- /**
- * Analytic exprs cannot be constant.
- */
- @Override
- public boolean isConstant() { return false; }
-
- @Override
- public Expr clone() { return new AnalyticExpr(this); }
-
- @Override
- public String toSqlImpl() {
- if (sqlString_ != null) return sqlString_;
- StringBuilder sb = new StringBuilder();
- sb.append(fnCall_.toSql()).append(" OVER (");
- boolean needsSpace = false;
- if (!partitionExprs_.isEmpty()) {
- sb.append("PARTITION BY ").append(Expr.toSql(partitionExprs_));
- needsSpace = true;
- }
- if (!orderByElements_.isEmpty()) {
- List<String> orderByStrings = Lists.newArrayList();
- for (OrderByElement e: orderByElements_) {
- orderByStrings.add(e.toSql());
- }
- if (needsSpace) sb.append(" ");
- sb.append("ORDER BY ").append(Joiner.on(", ").join(orderByStrings));
- needsSpace = true;
- }
- if (window_ != null) {
- if (needsSpace) sb.append(" ");
- sb.append(window_.toSql());
- }
- sb.append(")");
- return sb.toString();
- }
-
- @Override
- public String debugString() {
- return Objects.toStringHelper(this)
- .add("fn", getFnCall())
- .add("window", window_)
- .addValue(super.debugString())
- .toString();
- }
-
- @Override
- protected void toThrift(TExprNode msg) {
- }
-
- private static boolean isAnalyticFn(Function fn) {
- return fn instanceof AggregateFunction
- && ((AggregateFunction) fn).isAnalyticFn();
- }
-
- private static boolean isAnalyticFn(Function fn, String fnName) {
- return isAnalyticFn(fn) && fn.functionName().equals(fnName);
- }
-
- public static boolean isAggregateFn(Function fn) {
- return fn instanceof AggregateFunction
- && ((AggregateFunction) fn).isAggregateFn();
- }
-
- public static boolean isPercentRankFn(Function fn) {
- return isAnalyticFn(fn, PERCENT_RANK);
- }
-
- public static boolean isCumeDistFn(Function fn) {
- return isAnalyticFn(fn, CUME_DIST);
- }
-
- public static boolean isNtileFn(Function fn) {
- return isAnalyticFn(fn, NTILE);
- }
-
- static private boolean isOffsetFn(Function fn) {
- return isAnalyticFn(fn, LEAD) || isAnalyticFn(fn, LAG);
- }
-
- static private boolean isMinMax(Function fn) {
- return isAnalyticFn(fn, MIN) || isAnalyticFn(fn, MAX);
- }
-
- static private boolean isRankingFn(Function fn) {
- return isAnalyticFn(fn, RANK) || isAnalyticFn(fn, DENSERANK) ||
- isAnalyticFn(fn, ROWNUMBER);
- }
-
- /**
- * Rewrite the following analytic functions:
- * percent_rank(), cume_dist() and ntile()
- *
- * Returns a new Expr if the analytic expr is rewritten, returns null if it's not one
- * that we want to rewrite.
- */
- public static Expr rewrite(AnalyticExpr analyticExpr) {
- Function fn = analyticExpr.getFnCall().getFn();
- if (AnalyticExpr.isPercentRankFn(fn)) {
- return createPercentRank(analyticExpr);
- } else if (AnalyticExpr.isCumeDistFn(fn)) {
- return createCumeDist(analyticExpr);
- } else if (AnalyticExpr.isNtileFn(fn)) {
- return createNtile(analyticExpr);
- }
- return null;
- }
-
- /**
- * Rewrite percent_rank() to the following:
- *
- * percent_rank() over([partition by clause] order by clause)
- * = (Count == 1) ? 0:(Rank - 1)/(Count - 1)
- * where,
- * Rank = rank() over([partition by clause] order by clause)
- * Count = count() over([partition by clause])
- */
- private static Expr createPercentRank(AnalyticExpr analyticExpr) {
- Preconditions.checkState(
- AnalyticExpr.isPercentRankFn(analyticExpr.getFnCall().getFn()));
-
- NumericLiteral zero = new NumericLiteral(BigInteger.valueOf(0), ScalarType.BIGINT);
- NumericLiteral one = new NumericLiteral(BigInteger.valueOf(1), ScalarType.BIGINT);
- AnalyticExpr countExpr = create("count", analyticExpr, false, false);
- AnalyticExpr rankExpr = create("rank", analyticExpr, true, false);
-
- ArithmeticExpr arithmeticRewrite =
- new ArithmeticExpr(ArithmeticExpr.Operator.DIVIDE,
- new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, rankExpr, one),
- new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, countExpr, one));
-
- List<Expr> ifParams = Lists.newArrayList();
- ifParams.add(
- new BinaryPredicate(BinaryPredicate.Operator.EQ, one, countExpr));
- ifParams.add(zero);
- ifParams.add(arithmeticRewrite);
- FunctionCallExpr resultantRewrite = new FunctionCallExpr("if", ifParams);
-
- return resultantRewrite;
- }
-
- /**
- * Rewrite cume_dist() to the following:
- *
- * cume_dist() over([partition by clause] order by clause)
- * = ((Count - Rank) + 1)/Count
- * where,
- * Rank = rank() over([partition by clause] order by clause DESC)
- * Count = count() over([partition by clause])
- */
- private static Expr createCumeDist(AnalyticExpr analyticExpr) {
- Preconditions.checkState(
- AnalyticExpr.isCumeDistFn(analyticExpr.getFnCall().getFn()));
- AnalyticExpr rankExpr = create("rank", analyticExpr, true, true);
- AnalyticExpr countExpr = create("count", analyticExpr, false, false);
- NumericLiteral one = new NumericLiteral(BigInteger.valueOf(1), ScalarType.BIGINT);
- ArithmeticExpr arithmeticRewrite =
- new ArithmeticExpr(ArithmeticExpr.Operator.DIVIDE,
- new ArithmeticExpr(ArithmeticExpr.Operator.ADD,
- new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, countExpr, rankExpr),
- one),
- countExpr);
- return arithmeticRewrite;
- }
-
- /**
- * Rewrite ntile() to the following:
- *
- * ntile(B) over([partition by clause] order by clause)
- * = floor(min(Count, B) * (RowNumber - 1)/Count) + 1
- * where,
- * RowNumber = row_number() over([partition by clause] order by clause)
- * Count = count() over([partition by clause])
- */
- private static Expr createNtile(AnalyticExpr analyticExpr) {
- Preconditions.checkState(
- AnalyticExpr.isNtileFn(analyticExpr.getFnCall().getFn()));
- Expr bucketExpr = analyticExpr.getChild(0);
- AnalyticExpr rowNumExpr = create("row_number", analyticExpr, true, false);
- AnalyticExpr countExpr = create("count", analyticExpr, false, false);
-
- List<Expr> ifParams = Lists.newArrayList();
- ifParams.add(
- new BinaryPredicate(BinaryPredicate.Operator.LT, bucketExpr, countExpr));
- ifParams.add(bucketExpr);
- ifParams.add(countExpr);
-
- NumericLiteral one = new NumericLiteral(BigInteger.valueOf(1), ScalarType.BIGINT);
- ArithmeticExpr minMultiplyRowMinusOne =
- new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY,
- new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT, rowNumExpr, one),
- new FunctionCallExpr("if", ifParams));
- ArithmeticExpr divideAddOne =
- new ArithmeticExpr(ArithmeticExpr.Operator.ADD,
- new ArithmeticExpr(ArithmeticExpr.Operator.INT_DIVIDE,
- minMultiplyRowMinusOne, countExpr),
- one);
- return divideAddOne;
- }
-
- /**
- * Create a new Analytic Expr and associate it with a new function.
- * Takes a reference analytic expression and clones the partition expressions and the
- * order by expressions if 'copyOrderBy' is set and optionally reverses it if
- * 'reverseOrderBy' is set. The new function that it will be associated with is
- * specified by fnName.
- */
- private static AnalyticExpr create(String fnName,
- AnalyticExpr referenceExpr, boolean copyOrderBy, boolean reverseOrderBy) {
- FunctionCallExpr fnExpr = new FunctionCallExpr(fnName, new ArrayList<Expr>());
- fnExpr.setIsAnalyticFnCall(true);
- List<OrderByElement> orderByElements = null;
- if (copyOrderBy) {
- if (reverseOrderBy) {
- orderByElements = OrderByElement.reverse(referenceExpr.getOrderByElements());
- } else {
- orderByElements = Lists.newArrayList();
- for (OrderByElement elem: referenceExpr.getOrderByElements()) {
- orderByElements.add(elem.clone());
- }
- }
- }
- AnalyticExpr analyticExpr = new AnalyticExpr(fnExpr,
- Expr.cloneList(referenceExpr.getPartitionExprs()), orderByElements, null);
- return analyticExpr;
- }
-
- /**
- * Checks that the value expr of an offset boundary of a RANGE window is compatible
- * with orderingExprs (and that there's only a single ordering expr).
- */
- private void checkRangeOffsetBoundaryExpr(AnalyticWindow.Boundary boundary)
- throws AnalysisException {
- Preconditions.checkState(boundary.getType().isOffset());
- if (orderByElements_.size() > 1) {
- throw new AnalysisException("Only one ORDER BY expression allowed if used with "
- + "a RANGE window with PRECEDING/FOLLOWING: " + toSql());
- }
- Expr rangeExpr = boundary.getExpr();
- if (!Type.isImplicitlyCastable(
- rangeExpr.getType(), orderByElements_.get(0).getExpr().getType(), false)) {
- throw new AnalysisException(
- "The value expression of a PRECEDING/FOLLOWING clause of a RANGE window must "
- + "be implicitly convertable to the ORDER BY expression's type: "
- + rangeExpr.toSql() + " cannot be implicitly converted to "
- + orderByElements_.get(0).getExpr().getType().toSql());
- }
- }
-
- /**
- * Checks offset of lag()/lead().
- */
- void checkOffset(Analyzer analyzer) throws AnalysisException {
- Preconditions.checkState(isOffsetFn(getFnCall().getFn()));
- Preconditions.checkState(getFnCall().getChildren().size() > 1);
- Expr offset = getFnCall().getChild(1);
- Preconditions.checkState(offset.getType().isIntegerType());
- boolean isPosConstant = true;
- if (!offset.isConstant()) {
- isPosConstant = false;
- } else {
- try {
- TColumnValue val = FeSupport.EvalConstExpr(offset, analyzer.getQueryCtx());
- if (TColumnValueUtil.getNumericVal(val) <= 0) isPosConstant = false;
- } catch (InternalException exc) {
- throw new AnalysisException(
- "Couldn't evaluate LEAD/LAG offset: " + exc.getMessage());
- }
- }
- if (!isPosConstant) {
- throw new AnalysisException(
- "The offset parameter of LEAD/LAG must be a constant positive integer: "
- + getFnCall().toSql());
- }
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws AnalysisException {
- if (isAnalyzed_) return;
- fnCall_.analyze(analyzer);
- super.analyze(analyzer);
- type_ = getFnCall().getType();
-
- for (Expr e: partitionExprs_) {
- if (e.isConstant()) {
- throw new AnalysisException(
- "Expressions in the PARTITION BY clause must not be constant: "
- + e.toSql() + " (in " + toSql() + ")");
- } else if (e.getType().isComplexType()) {
- throw new AnalysisException(String.format("PARTITION BY expression '%s' with " +
- "complex type '%s' is not supported.", e.toSql(),
- e.getType().toSql()));
- }
- }
- for (OrderByElement e: orderByElements_) {
- if (e.getExpr().isConstant()) {
- throw new AnalysisException(
- "Expressions in the ORDER BY clause must not be constant: "
- + e.getExpr().toSql() + " (in " + toSql() + ")");
- } else if (e.getExpr().getType().isComplexType()) {
- throw new AnalysisException(String.format("ORDER BY expression '%s' with " +
- "complex type '%s' is not supported.", e.getExpr().toSql(),
- e.getExpr().getType().toSql()));
- }
- }
-
- if (getFnCall().getParams().isDistinct()) {
- throw new AnalysisException(
- "DISTINCT not allowed in analytic function: " + getFnCall().toSql());
- }
-
- if (getFnCall().getParams().isIgnoreNulls()) {
- String fnName = getFnCall().getFnName().getFunction();
- if (!fnName.equals(LAST_VALUE) && !fnName.equals(FIRST_VALUE)) {
- throw new AnalysisException("Function " + fnName.toUpperCase()
- + " does not accept the keyword IGNORE NULLS.");
- }
- }
-
- // check for correct composition of analytic expr
- Function fn = getFnCall().getFn();
- if (!(fn instanceof AggregateFunction)) {
- throw new AnalysisException(
- "OVER clause requires aggregate or analytic function: "
- + getFnCall().toSql());
- }
-
- // check for non-analytic aggregate functions
- if (!isAnalyticFn(fn)) {
- throw new AnalysisException(
- String.format("Aggregate function '%s' not supported with OVER clause.",
- getFnCall().toSql()));
- }
-
- if (isAnalyticFn(fn) && !isAggregateFn(fn)) {
- if (orderByElements_.isEmpty()) {
- throw new AnalysisException(
- "'" + getFnCall().toSql() + "' requires an ORDER BY clause");
- }
- if ((isRankingFn(fn) || isOffsetFn(fn)) && window_ != null) {
- throw new AnalysisException(
- "Windowing clause not allowed with '" + getFnCall().toSql() + "'");
- }
- if (isOffsetFn(fn) && getFnCall().getChildren().size() > 1) {
- checkOffset(analyzer);
- // check the default, which needs to be a constant at the moment
- // TODO: remove this check when the backend can handle non-constants
- if (getFnCall().getChildren().size() > 2) {
- if (!getFnCall().getChild(2).isConstant()) {
- throw new AnalysisException(
- "The default parameter (parameter 3) of LEAD/LAG must be a constant: "
- + getFnCall().toSql());
- }
- }
- }
- if (isNtileFn(fn)) {
- // TODO: IMPALA-2171:Remove this when ntile() can handle a non-constant argument.
- if (!getFnCall().getChild(0).isConstant()) {
- throw new AnalysisException("NTILE() requires a constant argument");
- }
- // Check if argument value is zero or negative and throw an exception if found.
- try {
- TColumnValue bucketValue =
- FeSupport.EvalConstExpr(getFnCall().getChild(0), analyzer.getQueryCtx());
- Long arg = bucketValue.getLong_val();
- if (arg <= 0) {
- throw new AnalysisException("NTILE() requires a positive argument: " + arg);
- }
- } catch (InternalException e) {
- throw new AnalysisException(e.toString());
- }
- }
- }
-
- if (window_ != null) {
- if (orderByElements_.isEmpty()) {
- throw new AnalysisException("Windowing clause requires ORDER BY clause: "
- + toSql());
- }
- window_.analyze(analyzer);
-
- if (!orderByElements_.isEmpty()
- && window_.getType() == AnalyticWindow.Type.RANGE) {
- // check that preceding/following ranges match ordering
- if (window_.getLeftBoundary().getType().isOffset()) {
- checkRangeOffsetBoundaryExpr(window_.getLeftBoundary());
- }
- if (window_.getRightBoundary() != null
- && window_.getRightBoundary().getType().isOffset()) {
- checkRangeOffsetBoundaryExpr(window_.getRightBoundary());
- }
- }
- }
-
- // check nesting
- if (TreeNode.contains(getChildren(), AnalyticExpr.class)) {
- throw new AnalysisException(
- "Nesting of analytic expressions is not allowed: " + toSql());
- }
- sqlString_ = toSql();
-
- standardize(analyzer);
-
- // min/max is not currently supported on sliding windows (i.e. start bound is not
- // unbounded).
- if (window_ != null && isMinMax(fn) &&
- window_.getLeftBoundary().getType() != BoundaryType.UNBOUNDED_PRECEDING) {
- throw new AnalysisException(
- "'" + getFnCall().toSql() + "' is only supported with an "
- + "UNBOUNDED PRECEDING start bound.");
- }
-
- setChildren();
- }
-
- /**
- * If necessary, rewrites the analytic function, window, and/or order-by elements into
- * a standard format for the purpose of simpler backend execution, as follows:
- * 1. row_number():
- * Set a window from UNBOUNDED PRECEDING to CURRENT_ROW.
- * 2. lead()/lag():
- * Explicitly set the default arguments to for BE simplicity.
- * Set a window for lead(): UNBOUNDED PRECEDING to OFFSET FOLLOWING.
- * Set a window for lag(): UNBOUNDED PRECEDING to OFFSET PRECEDING.
- * 3. FIRST_VALUE without UNBOUNDED PRECEDING or IGNORE NULLS gets rewritten to use a
- * different window and function. There are a few cases:
- * a) Start bound is X FOLLOWING or CURRENT ROW (X=0):
- * Use 'last_value' with a window where both bounds are X FOLLOWING (or
- * CURRENT ROW). Setting the start bound to X following is necessary because the
- * X rows at the end of a partition have no rows in their window. Note that X
- * FOLLOWING could be rewritten as lead(X) but that would not work for CURRENT
- * ROW.
- * b) Start bound is X PRECEDING and end bound is CURRENT ROW or FOLLOWING:
- * Use 'first_value_rewrite' and a window with an end bound X PRECEDING. An
- * extra parameter '-1' is added to indicate to the backend that NULLs should
- * not be added for the first X rows.
- * c) Start bound is X PRECEDING and end bound is Y PRECEDING:
- * Use 'first_value_rewrite' and a window with an end bound X PRECEDING. The
- * first Y rows in a partition have empty windows and should be NULL. An extra
- * parameter with the integer constant Y is added to indicate to the backend
- * that NULLs should be added for the first Y rows.
- * The performance optimization here and in 5. below cannot be applied in the case of
- * IGNORE NULLS because they change what values appear in the window, which in the
- * IGNORE NULLS case could mean the correct value to return isn't even in the window,
- * eg. if all of the values in the rewritten window are NULL but one of the values in
- * the original window isn't.
- * 4. Start bound is not UNBOUNDED PRECEDING and either the end bound is UNBOUNDED
- * FOLLOWING or the function is first_value(... ignore nulls):
- * Reverse the ordering and window, and flip first_value() and last_value().
- * 5. first_value() with UNBOUNDED PRECEDING and not IGNORE NULLS:
- * Set the end boundary to CURRENT_ROW.
- * 6. Rewrite IGNORE NULLS as regular FunctionCallExprs with '_ignore_nulls'
- * appended to the function name, because the BE implements them as different
- * functions.
- * 7. Explicitly set the default window if no window was given but there
- * are order-by elements.
- * 8. first/last_value() with RANGE window:
- * Rewrite as a ROWS window.
- */
- private void standardize(Analyzer analyzer) {
- FunctionName analyticFnName = getFnCall().getFnName();
-
- // 1. Set a window from UNBOUNDED PRECEDING to CURRENT_ROW for row_number().
- if (analyticFnName.getFunction().equals(ROWNUMBER)) {
- Preconditions.checkState(window_ == null, "Unexpected window set for row_numer()");
- window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS,
- new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null),
- new Boundary(BoundaryType.CURRENT_ROW, null));
- resetWindow_ = true;
- return;
- }
-
- // 2. Explicitly set the default arguments to lead()/lag() for BE simplicity.
- // Set a window for lead(): UNBOUNDED PRECEDING to OFFSET FOLLOWING,
- // Set a window for lag(): UNBOUNDED PRECEDING to OFFSET PRECEDING.
- if (isOffsetFn(getFnCall().getFn())) {
- Preconditions.checkState(window_ == null);
-
- // If necessary, create a new fn call with the default args explicitly set.
- List<Expr> newExprParams = null;
- if (getFnCall().getChildren().size() == 1) {
- newExprParams = Lists.newArrayListWithExpectedSize(3);
- newExprParams.addAll(getFnCall().getChildren());
- // Default offset is 1.
- newExprParams.add(new NumericLiteral(BigDecimal.valueOf(1)));
- // Default default value is NULL.
- newExprParams.add(new NullLiteral());
- } else if (getFnCall().getChildren().size() == 2) {
- newExprParams = Lists.newArrayListWithExpectedSize(3);
- newExprParams.addAll(getFnCall().getChildren());
- // Default default value is NULL.
- newExprParams.add(new NullLiteral());
- } else {
- Preconditions.checkState(getFnCall().getChildren().size() == 3);
- }
- if (newExprParams != null) {
- fnCall_ = new FunctionCallExpr(getFnCall().getFnName(),
- new FunctionParams(newExprParams));
- fnCall_.setIsAnalyticFnCall(true);
- fnCall_.analyzeNoThrow(analyzer);
- }
-
- // Set the window.
- BoundaryType rightBoundaryType = BoundaryType.FOLLOWING;
- if (analyticFnName.getFunction().equals(LAG)) {
- rightBoundaryType = BoundaryType.PRECEDING;
- }
- window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS,
- new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null),
- new Boundary(rightBoundaryType, getOffsetExpr(getFnCall())));
- try {
- window_.analyze(analyzer);
- } catch (AnalysisException e) {
- throw new IllegalStateException(e);
- }
- resetWindow_ = true;
- return;
- }
-
- // 3.
- if (analyticFnName.getFunction().equals(FIRST_VALUE)
- && window_ != null
- && window_.getLeftBoundary().getType() != BoundaryType.UNBOUNDED_PRECEDING
- && !getFnCall().getParams().isIgnoreNulls()) {
- if (window_.getLeftBoundary().getType() != BoundaryType.PRECEDING) {
- window_ = new AnalyticWindow(window_.getType(), window_.getLeftBoundary(),
- window_.getLeftBoundary());
- fnCall_ = new FunctionCallExpr(new FunctionName(LAST_VALUE),
- getFnCall().getParams());
- } else {
- List<Expr> paramExprs = Expr.cloneList(getFnCall().getParams().exprs());
- if (window_.getRightBoundary().getType() == BoundaryType.PRECEDING) {
- // The number of rows preceding for the end bound determines the number of
- // rows at the beginning of each partition that should have a NULL value.
- paramExprs.add(new NumericLiteral(window_.getRightBoundary().getOffsetValue(),
- Type.BIGINT));
- } else {
- // -1 indicates that no NULL values are inserted even though we set the end
- // bound to the start bound (which is PRECEDING) below; this is different from
- // the default behavior of windows with an end bound PRECEDING.
- paramExprs.add(new NumericLiteral(BigInteger.valueOf(-1), Type.BIGINT));
- }
-
- window_ = new AnalyticWindow(window_.getType(),
- new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null),
- window_.getLeftBoundary());
- fnCall_ = new FunctionCallExpr(new FunctionName(FIRST_VALUE_REWRITE),
- new FunctionParams(paramExprs));
- fnCall_.setIsInternalFnCall(true);
- }
- fnCall_.setIsAnalyticFnCall(true);
- fnCall_.analyzeNoThrow(analyzer);
- // Use getType() instead if getReturnType() because wildcard decimals
- // have only been resolved in the former.
- type_ = fnCall_.getType();
- analyticFnName = getFnCall().getFnName();
- }
-
- // 4. Reverse the ordering and window for windows not starting with UNBOUNDED
- // PRECEDING and either: ending with UNBOUNDED FOLLOWING or
- // first_value(... ignore nulls)
- if (window_ != null
- && window_.getLeftBoundary().getType() != BoundaryType.UNBOUNDED_PRECEDING
- && (window_.getRightBoundary().getType() == BoundaryType.UNBOUNDED_FOLLOWING
- || (analyticFnName.getFunction().equals(FIRST_VALUE)
- && getFnCall().getParams().isIgnoreNulls()))) {
- orderByElements_ = OrderByElement.reverse(orderByElements_);
- window_ = window_.reverse();
-
- // Also flip first_value()/last_value(). For other analytic functions there is no
- // need to also change the function.
- FunctionName reversedFnName = null;
- if (analyticFnName.getFunction().equals(FIRST_VALUE)) {
- reversedFnName = new FunctionName(LAST_VALUE);
- } else if (analyticFnName.getFunction().equals(LAST_VALUE)) {
- reversedFnName = new FunctionName(FIRST_VALUE);
- }
- if (reversedFnName != null) {
- fnCall_ = new FunctionCallExpr(reversedFnName, getFnCall().getParams());
- fnCall_.setIsAnalyticFnCall(true);
- fnCall_.analyzeNoThrow(analyzer);
- }
- analyticFnName = getFnCall().getFnName();
- }
-
- // 5. Set the start boundary to CURRENT_ROW for first_value() if the end boundary
- // is UNBOUNDED_PRECEDING and IGNORE NULLS is not set.
- if (analyticFnName.getFunction().equals(FIRST_VALUE)
- && window_ != null
- && window_.getLeftBoundary().getType() == BoundaryType.UNBOUNDED_PRECEDING
- && window_.getRightBoundary().getType() != BoundaryType.PRECEDING
- && !getFnCall().getParams().isIgnoreNulls()) {
- window_.setRightBoundary(new Boundary(BoundaryType.CURRENT_ROW, null));
- }
-
- // 6. Set the default window.
- if (!orderByElements_.isEmpty() && window_ == null) {
- window_ = AnalyticWindow.DEFAULT_WINDOW;
- resetWindow_ = true;
- }
-
- // 7. Change first_value/last_value RANGE windows to ROWS.
- if ((analyticFnName.getFunction().equals(FIRST_VALUE)
- || analyticFnName.getFunction().equals(LAST_VALUE))
- && window_ != null
- && window_.getType() == AnalyticWindow.Type.RANGE) {
- window_ = new AnalyticWindow(AnalyticWindow.Type.ROWS, window_.getLeftBoundary(),
- window_.getRightBoundary());
- }
-
- // 8. Append IGNORE NULLS to fn name if set.
- if (getFnCall().getParams().isIgnoreNulls()) {
- if (analyticFnName.getFunction().equals(LAST_VALUE)) {
- fnCall_ = new FunctionCallExpr(new FunctionName(LAST_VALUE_IGNORE_NULLS),
- getFnCall().getParams());
- } else {
- Preconditions.checkState(analyticFnName.getFunction().equals(FIRST_VALUE));
- fnCall_ = new FunctionCallExpr(new FunctionName(FIRST_VALUE_IGNORE_NULLS),
- getFnCall().getParams());
- }
-
- fnCall_.setIsAnalyticFnCall(true);
- fnCall_.setIsInternalFnCall(true);
- fnCall_.analyzeNoThrow(analyzer);
- analyticFnName = getFnCall().getFnName();
- Preconditions.checkState(type_.equals(fnCall_.getType()));
- }
- }
-
- /**
- * Returns the explicit or implicit offset of an analytic function call.
- */
- private Expr getOffsetExpr(FunctionCallExpr offsetFnCall) {
- Preconditions.checkState(isOffsetFn(getFnCall().getFn()));
- if (offsetFnCall.getChild(1) != null) return offsetFnCall.getChild(1);
- // The default offset is 1.
- return new NumericLiteral(BigDecimal.valueOf(1));
- }
-
- /**
- * Keep fnCall_, partitionExprs_ and orderByElements_ in sync with children_.
- */
- private void syncWithChildren() {
- int numArgs = fnCall_.getChildren().size();
- for (int i = 0; i < numArgs; ++i) {
- fnCall_.setChild(i, getChild(i));
- }
- int numPartitionExprs = partitionExprs_.size();
- for (int i = 0; i < numPartitionExprs; ++i) {
- partitionExprs_.set(i, getChild(numArgs + i));
- }
- for (int i = 0; i < orderByElements_.size(); ++i) {
- orderByElements_.get(i).setExpr(getChild(numArgs + numPartitionExprs + i));
- }
- }
-
- /**
- * Populate children_ from fnCall_, partitionExprs_, orderByElements_
- */
- private void setChildren() {
- getChildren().clear();
- addChildren(fnCall_.getChildren());
- addChildren(partitionExprs_);
- for (OrderByElement e: orderByElements_) {
- addChild(e.getExpr());
- }
- if (window_ != null) {
- if (window_.getLeftBoundary().getExpr() != null) {
- addChild(window_.getLeftBoundary().getExpr());
- }
- if (window_.getRightBoundary() != null
- && window_.getRightBoundary().getExpr() != null) {
- addChild(window_.getRightBoundary().getExpr());
- }
- }
- }
-
- @Override
- protected void resetAnalysisState() {
- super.resetAnalysisState();
- fnCall_.resetAnalysisState();
- if (resetWindow_) window_ = null;
- resetWindow_ = false;
- // sync with children, now that they've been reset
- syncWithChildren();
- }
-
- @Override
- protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
- throws AnalysisException {
- Expr e = super.substituteImpl(smap, analyzer);
- if (!(e instanceof AnalyticExpr)) return e;
- // Re-sync state after possible child substitution.
- ((AnalyticExpr) e).syncWithChildren();
- return e;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java
deleted file mode 100644
index d0d1a85..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticInfo.java
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.catalog.Type;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Encapsulates the analytic functions found in a single select block plus
- * the corresponding analytic result tuple and its substitution map.
- */
-public class AnalyticInfo extends AggregateInfoBase {
- private final static Logger LOG = LoggerFactory.getLogger(AnalyticInfo.class);
-
- // All unique analytic exprs of a select block. Used to populate
- // super.aggregateExprs_ based on AnalyticExpr.getFnCall() for each analytic expr
- // in this list.
- private final ArrayList<Expr> analyticExprs_;
-
- // Intersection of the partition exps of all the analytic functions.
- private final List<Expr> commonPartitionExprs_;
-
- // map from analyticExprs_ to their corresponding analytic tuple slotrefs
- private final ExprSubstitutionMap analyticTupleSmap_;
-
- private AnalyticInfo(ArrayList<Expr> analyticExprs) {
- super(new ArrayList<Expr>(), new ArrayList<FunctionCallExpr>());
- analyticExprs_ = Expr.cloneList(analyticExprs);
- // Extract the analytic function calls for each analytic expr.
- for (Expr analyticExpr: analyticExprs) {
- aggregateExprs_.add(((AnalyticExpr) analyticExpr).getFnCall());
- }
- analyticTupleSmap_ = new ExprSubstitutionMap();
- commonPartitionExprs_ = computeCommonPartitionExprs();
- }
-
- /**
- * C'tor for cloning.
- */
- private AnalyticInfo(AnalyticInfo other) {
- super(other);
- analyticExprs_ =
- (other.analyticExprs_ != null) ? Expr.cloneList(other.analyticExprs_) : null;
- analyticTupleSmap_ = other.analyticTupleSmap_.clone();
- commonPartitionExprs_ = Expr.cloneList(other.commonPartitionExprs_);
- }
-
- public ArrayList<Expr> getAnalyticExprs() { return analyticExprs_; }
- public ExprSubstitutionMap getSmap() { return analyticTupleSmap_; }
- public List<Expr> getCommonPartitionExprs() { return commonPartitionExprs_; }
-
- /**
- * Creates complete AnalyticInfo for analyticExprs, including tuple descriptors and
- * smaps.
- */
- static public AnalyticInfo create(
- ArrayList<Expr> analyticExprs, Analyzer analyzer) {
- Preconditions.checkState(analyticExprs != null && !analyticExprs.isEmpty());
- Expr.removeDuplicates(analyticExprs);
- AnalyticInfo result = new AnalyticInfo(analyticExprs);
- result.createTupleDescs(analyzer);
-
- // The tuple descriptors are logical. Their slots are remapped to physical tuples
- // during plan generation.
- result.outputTupleDesc_.setIsMaterialized(false);
- result.intermediateTupleDesc_.setIsMaterialized(false);
-
- // Populate analyticTupleSmap_
- Preconditions.checkState(analyticExprs.size() ==
- result.outputTupleDesc_.getSlots().size());
- for (int i = 0; i < analyticExprs.size(); ++i) {
- result.analyticTupleSmap_.put(result.analyticExprs_.get(i),
- new SlotRef(result.outputTupleDesc_.getSlots().get(i)));
- result.outputTupleDesc_.getSlots().get(i).setSourceExpr(
- result.analyticExprs_.get(i));
- }
- LOG.trace("analytictuple=" + result.outputTupleDesc_.debugString());
- LOG.trace("analytictuplesmap=" + result.analyticTupleSmap_.debugString());
- LOG.trace("analytic info:\n" + result.debugString());
- return result;
- }
-
- /**
- * Returns the intersection of the partition exprs of all the
- * analytic functions.
- */
- private List<Expr> computeCommonPartitionExprs() {
- List<Expr> result = Lists.newArrayList();
- for (Expr analyticExpr: analyticExprs_) {
- Preconditions.checkState(analyticExpr.isAnalyzed_);
- List<Expr> partitionExprs = ((AnalyticExpr) analyticExpr).getPartitionExprs();
- if (partitionExprs == null) continue;
- if (result.isEmpty()) {
- result.addAll(partitionExprs);
- } else {
- result.retainAll(partitionExprs);
- if (result.isEmpty()) break;
- }
- }
- return result;
- }
-
- /**
- * Append ids of all slots that are being referenced in the process
- * of performing the analytic computation described by this AnalyticInfo.
- */
- public void getRefdSlots(List<SlotId> ids) {
- Preconditions.checkState(intermediateTupleDesc_ != null);
- Expr.getIds(analyticExprs_, null, ids);
- // The backend assumes that the entire intermediateTupleDesc is materialized
- for (SlotDescriptor slotDesc: intermediateTupleDesc_.getSlots()) {
- ids.add(slotDesc.getId());
- }
- }
-
- @Override
- public void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap) {
- materializedSlots_.clear();
- List<Expr> exprs = Lists.newArrayList();
- for (int i = 0; i < analyticExprs_.size(); ++i) {
- SlotDescriptor outputSlotDesc = outputTupleDesc_.getSlots().get(i);
- if (!outputSlotDesc.isMaterialized()) continue;
- intermediateTupleDesc_.getSlots().get(i).setIsMaterialized(true);
- exprs.add(analyticExprs_.get(i));
- materializedSlots_.add(i);
- }
- List<Expr> resolvedExprs = Expr.substituteList(exprs, smap, analyzer, false);
- analyzer.materializeSlots(resolvedExprs);
- }
-
- /**
- * Validates internal state: Checks that the number of materialized slots of the
- * analytic tuple corresponds to the number of materialized analytic functions. Also
- * checks that the return types of the analytic exprs correspond to the slots in the
- * analytic tuple.
- */
- public void checkConsistency() {
- ArrayList<SlotDescriptor> slots = intermediateTupleDesc_.getSlots();
-
- // Check materialized slots.
- int numMaterializedSlots = 0;
- for (SlotDescriptor slotDesc: slots) {
- if (slotDesc.isMaterialized()) ++numMaterializedSlots;
- }
- Preconditions.checkState(numMaterializedSlots ==
- materializedSlots_.size());
-
- // Check that analytic expr return types match the slot descriptors.
- int slotIdx = 0;
- for (int i = 0; i < analyticExprs_.size(); ++i) {
- Expr analyticExpr = analyticExprs_.get(i);
- Type slotType = slots.get(slotIdx).getType();
- Preconditions.checkState(analyticExpr.getType().equals(slotType),
- String.format("Analytic expr %s returns type %s but its analytic tuple " +
- "slot has type %s", analyticExpr.toSql(),
- analyticExpr.getType().toString(), slotType.toString()));
- ++slotIdx;
- }
- }
-
- @Override
- public String debugString() {
- StringBuilder out = new StringBuilder(super.debugString());
- out.append(Objects.toStringHelper(this)
- .add("analytic_exprs", Expr.debugString(analyticExprs_))
- .add("smap", analyticTupleSmap_.debugString())
- .toString());
- return out.toString();
- }
-
- @Override
- protected String tupleDebugName() { return "analytic-tuple"; }
-
- @Override
- public AnalyticInfo clone() { return new AnalyticInfo(this); }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java
deleted file mode 100644
index 68558da..0000000
--- a/fe/src/main/java/com/cloudera/impala/analysis/AnalyticWindow.java
+++ /dev/null
@@ -1,417 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.analysis;
-
-import java.math.BigDecimal;
-
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.service.FeSupport;
-import com.cloudera.impala.thrift.TAnalyticWindow;
-import com.cloudera.impala.thrift.TAnalyticWindowBoundary;
-import com.cloudera.impala.thrift.TAnalyticWindowBoundaryType;
-import com.cloudera.impala.thrift.TAnalyticWindowType;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.util.TColumnValueUtil;
-import com.google.common.base.Preconditions;
-
-
-/**
- * Windowing clause of an analytic expr
- * Both left and right boundaries are always non-null after analyze().
- */
-public class AnalyticWindow {
- // default window used when an analytic expr was given an order by but no window
- public static final AnalyticWindow DEFAULT_WINDOW = new AnalyticWindow(Type.RANGE,
- new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null),
- new Boundary(BoundaryType.CURRENT_ROW, null));
-
- enum Type {
- ROWS("ROWS"),
- RANGE("RANGE");
-
- private final String description_;
-
- private Type(String d) {
- description_ = d;
- }
-
- @Override
- public String toString() { return description_; }
- public TAnalyticWindowType toThrift() {
- return this == ROWS ? TAnalyticWindowType.ROWS : TAnalyticWindowType.RANGE;
- }
- }
-
- enum BoundaryType {
- UNBOUNDED_PRECEDING("UNBOUNDED PRECEDING"),
- UNBOUNDED_FOLLOWING("UNBOUNDED FOLLOWING"),
- CURRENT_ROW("CURRENT ROW"),
- PRECEDING("PRECEDING"),
- FOLLOWING("FOLLOWING");
-
- private final String description_;
-
- private BoundaryType(String d) {
- description_ = d;
- }
-
- @Override
- public String toString() { return description_; }
- public TAnalyticWindowBoundaryType toThrift() {
- Preconditions.checkState(!isAbsolutePos());
- if (this == CURRENT_ROW) {
- return TAnalyticWindowBoundaryType.CURRENT_ROW;
- } else if (this == PRECEDING) {
- return TAnalyticWindowBoundaryType.PRECEDING;
- } else if (this == FOLLOWING) {
- return TAnalyticWindowBoundaryType.FOLLOWING;
- }
- return null;
- }
-
- public boolean isAbsolutePos() {
- return this == UNBOUNDED_PRECEDING || this == UNBOUNDED_FOLLOWING;
- }
-
- public boolean isOffset() {
- return this == PRECEDING || this == FOLLOWING;
- }
-
- public boolean isPreceding() {
- return this == UNBOUNDED_PRECEDING || this == PRECEDING;
- }
-
- public boolean isFollowing() {
- return this == UNBOUNDED_FOLLOWING || this == FOLLOWING;
- }
-
- public BoundaryType converse() {
- switch (this) {
- case UNBOUNDED_PRECEDING: return UNBOUNDED_FOLLOWING;
- case UNBOUNDED_FOLLOWING: return UNBOUNDED_PRECEDING;
- case PRECEDING: return FOLLOWING;
- case FOLLOWING: return PRECEDING;
- default: return CURRENT_ROW;
- }
- }
- }
-
- public static class Boundary {
- private final BoundaryType type_;
-
- // Offset expr. Only set for PRECEDING/FOLLOWING. Needed for toSql().
- private final Expr expr_;
-
- // The offset value. Set during analysis after evaluating expr_. Integral valued
- // for ROWS windows.
- private BigDecimal offsetValue_;
-
- public BoundaryType getType() { return type_; }
- public Expr getExpr() { return expr_; }
- public BigDecimal getOffsetValue() { return offsetValue_; }
-
- public Boundary(BoundaryType type, Expr e) {
- this(type, e, null);
- }
-
- // c'tor used by clone()
- private Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) {
- Preconditions.checkState(
- (type.isOffset() && e != null)
- || (!type.isOffset() && e == null));
- type_ = type;
- expr_ = e;
- offsetValue_ = offsetValue;
- }
-
- public String toSql() {
- StringBuilder sb = new StringBuilder();
- if (expr_ != null) sb.append(expr_.toSql()).append(" ");
- sb.append(type_.toString());
- return sb.toString();
- }
-
- public TAnalyticWindowBoundary toThrift(Type windowType) {
- TAnalyticWindowBoundary result = new TAnalyticWindowBoundary(type_.toThrift());
- if (type_.isOffset() && windowType == Type.ROWS) {
- result.setRows_offset_value(offsetValue_.longValue());
- }
- // TODO: range windows need range_offset_predicate
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) return false;
- if (obj.getClass() != this.getClass()) return false;
- Boundary o = (Boundary)obj;
- boolean exprEqual = (expr_ == null) == (o.expr_ == null);
- if (exprEqual && expr_ != null) exprEqual = expr_.equals(o.expr_);
- return type_ == o.type_ && exprEqual;
- }
-
- public Boundary converse() {
- Boundary result = new Boundary(type_.converse(),
- (expr_ != null) ? expr_.clone() : null);
- result.offsetValue_ = offsetValue_;
- return result;
- }
-
- @Override
- public Boundary clone() {
- return new Boundary(type_, expr_ != null ? expr_.clone() : null, offsetValue_);
- }
-
- public void analyze(Analyzer analyzer) throws AnalysisException {
- if (expr_ != null) expr_.analyze(analyzer);
- }
- }
-
- private final Type type_;
- private final Boundary leftBoundary_;
- private Boundary rightBoundary_; // may be null before analyze()
- private String toSqlString_; // cached after analysis
-
- public Type getType() { return type_; }
- public Boundary getLeftBoundary() { return leftBoundary_; }
- public Boundary getRightBoundary() { return rightBoundary_; }
- public Boundary setRightBoundary(Boundary b) { return rightBoundary_ = b; }
-
- public AnalyticWindow(Type type, Boundary b) {
- type_ = type;
- Preconditions.checkNotNull(b);
- leftBoundary_ = b;
- rightBoundary_ = null;
- }
-
- public AnalyticWindow(Type type, Boundary l, Boundary r) {
- type_ = type;
- Preconditions.checkNotNull(l);
- leftBoundary_ = l;
- Preconditions.checkNotNull(r);
- rightBoundary_ = r;
- }
-
- /**
- * Clone c'tor
- */
- private AnalyticWindow(AnalyticWindow other) {
- type_ = other.type_;
- Preconditions.checkNotNull(other.leftBoundary_);
- leftBoundary_ = other.leftBoundary_.clone();
- if (other.rightBoundary_ != null) {
- rightBoundary_ = other.rightBoundary_.clone();
- }
- toSqlString_ = other.toSqlString_; // safe to share
- }
-
- public AnalyticWindow reverse() {
- Boundary newRightBoundary = leftBoundary_.converse();
- Boundary newLeftBoundary = null;
- if (rightBoundary_ == null) {
- newLeftBoundary = new Boundary(leftBoundary_.getType(), null);
- } else {
- newLeftBoundary = rightBoundary_.converse();
- }
- return new AnalyticWindow(type_, newLeftBoundary, newRightBoundary);
- }
-
- public String toSql() {
- if (toSqlString_ != null) return toSqlString_;
- StringBuilder sb = new StringBuilder();
- sb.append(type_.toString()).append(" ");
- if (rightBoundary_ == null) {
- sb.append(leftBoundary_.toSql());
- } else {
- sb.append("BETWEEN ").append(leftBoundary_.toSql()).append(" AND ");
- sb.append(rightBoundary_.toSql());
- }
- return sb.toString();
- }
-
- public TAnalyticWindow toThrift() {
- TAnalyticWindow result = new TAnalyticWindow(type_.toThrift());
- if (leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) {
- result.setWindow_start(leftBoundary_.toThrift(type_));
- }
- Preconditions.checkNotNull(rightBoundary_);
- if (rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) {
- result.setWindow_end(rightBoundary_.toThrift(type_));
- }
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) return false;
- if (obj.getClass() != this.getClass()) return false;
- AnalyticWindow o = (AnalyticWindow)obj;
- boolean rightBoundaryEqual =
- (rightBoundary_ == null) == (o.rightBoundary_ == null);
- if (rightBoundaryEqual && rightBoundary_ != null) {
- rightBoundaryEqual = rightBoundary_.equals(o.rightBoundary_);
- }
- return type_ == o.type_
- && leftBoundary_.equals(o.leftBoundary_)
- && rightBoundaryEqual;
- }
-
- @Override
- public AnalyticWindow clone() { return new AnalyticWindow(this); }
-
- /**
- * Semantic analysis for expr of a PRECEDING/FOLLOWING clause.
- */
- private void checkOffsetExpr(Analyzer analyzer, Boundary boundary)
- throws AnalysisException {
- Preconditions.checkState(boundary.getType().isOffset());
- Expr e = boundary.getExpr();
- Preconditions.checkNotNull(e);
- boolean isPos = true;
- Double val = null;
- if (e.isConstant() && e.getType().isNumericType()) {
- try {
- val = TColumnValueUtil.getNumericVal(
- FeSupport.EvalConstExpr(e, analyzer.getQueryCtx()));
- if (val <= 0) isPos = false;
- } catch (InternalException exc) {
- throw new AnalysisException(
- "Couldn't evaluate PRECEDING/FOLLOWING expression: " + exc.getMessage());
- }
- }
-
- if (type_ == Type.ROWS) {
- if (!e.isConstant() || !e.getType().isIntegerType() || !isPos) {
- throw new AnalysisException(
- "For ROWS window, the value of a PRECEDING/FOLLOWING offset must be a "
- + "constant positive integer: " + boundary.toSql());
- }
- Preconditions.checkNotNull(val);
- boundary.offsetValue_ = new BigDecimal(val.longValue());
- } else {
- if (!e.isConstant() || !e.getType().isNumericType() || !isPos) {
- throw new AnalysisException(
- "For RANGE window, the value of a PRECEDING/FOLLOWING offset must be a "
- + "constant positive number: " + boundary.toSql());
- }
- boundary.offsetValue_ = new BigDecimal(val);
- }
- }
-
- /**
- * Check that b1 <= b2.
- */
- private void checkOffsetBoundaries(Analyzer analyzer, Boundary b1, Boundary b2)
- throws AnalysisException {
- Preconditions.checkState(b1.getType().isOffset());
- Preconditions.checkState(b2.getType().isOffset());
- Expr e1 = b1.getExpr();
- Preconditions.checkState(
- e1 != null && e1.isConstant() && e1.getType().isNumericType());
- Expr e2 = b2.getExpr();
- Preconditions.checkState(
- e2 != null && e2.isConstant() && e2.getType().isNumericType());
-
- try {
- TColumnValue val1 = FeSupport.EvalConstExpr(e1, analyzer.getQueryCtx());
- TColumnValue val2 = FeSupport.EvalConstExpr(e2, analyzer.getQueryCtx());
- double left = TColumnValueUtil.getNumericVal(val1);
- double right = TColumnValueUtil.getNumericVal(val2);
- if (left > right) {
- throw new AnalysisException(
- "Offset boundaries are in the wrong order: " + toSql());
- }
- } catch (InternalException exc) {
- throw new AnalysisException(
- "Couldn't evaluate PRECEDING/FOLLOWING expression: " + exc.getMessage());
- }
-
- }
-
- public void analyze(Analyzer analyzer) throws AnalysisException {
- leftBoundary_.analyze(analyzer);
- if (rightBoundary_ != null) rightBoundary_.analyze(analyzer);
-
- if (leftBoundary_.getType() == BoundaryType.UNBOUNDED_FOLLOWING) {
- throw new AnalysisException(
- leftBoundary_.getType().toString() + " is only allowed for upper bound of "
- + "BETWEEN");
- }
- if (rightBoundary_ != null
- && rightBoundary_.getType() == BoundaryType.UNBOUNDED_PRECEDING) {
- throw new AnalysisException(
- rightBoundary_.getType().toString() + " is only allowed for lower bound of "
- + "BETWEEN");
- }
-
- // TODO: Remove when RANGE windows with offset boundaries are supported.
- if (type_ == Type.RANGE) {
- if (leftBoundary_.type_.isOffset()
- || (rightBoundary_ != null && rightBoundary_.type_.isOffset())
- || (leftBoundary_.type_ == BoundaryType.CURRENT_ROW
- && (rightBoundary_ == null
- || rightBoundary_.type_ == BoundaryType.CURRENT_ROW))) {
- throw new AnalysisException(
- "RANGE is only supported with both the lower and upper bounds UNBOUNDED or"
- + " one UNBOUNDED and the other CURRENT ROW.");
- }
- }
-
- if (rightBoundary_ == null && leftBoundary_.getType() == BoundaryType.FOLLOWING) {
- throw new AnalysisException(
- leftBoundary_.getType().toString() + " requires a BETWEEN clause");
- }
-
- if (leftBoundary_.getType().isOffset()) checkOffsetExpr(analyzer, leftBoundary_);
- if (rightBoundary_ == null) {
- // set right boundary to implied value, but make sure to cache toSql string
- // beforehand
- toSqlString_ = toSql();
- rightBoundary_ = new Boundary(BoundaryType.CURRENT_ROW, null);
- return;
- }
- if (rightBoundary_.getType().isOffset()) checkOffsetExpr(analyzer, rightBoundary_);
-
- if (leftBoundary_.getType() == BoundaryType.FOLLOWING) {
- if (rightBoundary_.getType() != BoundaryType.FOLLOWING
- && rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) {
- throw new AnalysisException(
- "A lower window bound of " + BoundaryType.FOLLOWING.toString()
- + " requires that the upper bound also be "
- + BoundaryType.FOLLOWING.toString());
- }
- if (rightBoundary_.getType() != BoundaryType.UNBOUNDED_FOLLOWING) {
- checkOffsetBoundaries(analyzer, leftBoundary_, rightBoundary_);
- }
- }
-
- if (rightBoundary_.getType() == BoundaryType.PRECEDING) {
- if (leftBoundary_.getType() != BoundaryType.PRECEDING
- && leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) {
- throw new AnalysisException(
- "An upper window bound of " + BoundaryType.PRECEDING.toString()
- + " requires that the lower bound also be "
- + BoundaryType.PRECEDING.toString());
- }
- if (leftBoundary_.getType() != BoundaryType.UNBOUNDED_PRECEDING) {
- checkOffsetBoundaries(analyzer, rightBoundary_, leftBoundary_);
- }
- }
- }
-}