You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/26 06:46:19 UTC
[20/33] TAJO-1125: Separate logical plan and optimizer into a maven
module.
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
deleted file mode 100644
index 89382f4..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ /dev/null
@@ -1,2002 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.*;
-import org.apache.tajo.algebra.WindowSpec;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.exception.VerifyException;
-import org.apache.tajo.engine.optimizer.eval.EvalTreeOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
-import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-import static org.apache.tajo.algebra.CreateTable.PartitionType;
-import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
-import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
-
-/**
- * This class creates a logical plan from a nested tajo algebra expression ({@link org.apache.tajo.algebra})
- */
-public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContext, LogicalNode> {
- private static Log LOG = LogFactory.getLog(LogicalPlanner.class);
- private final CatalogService catalog;
- private final LogicalPlanPreprocessor preprocessor;
- private final EvalTreeOptimizer evalOptimizer;
- private final ExprAnnotator exprAnnotator;
- private final ExprNormalizer normalizer;
-
- public LogicalPlanner(CatalogService catalog) {
- this.catalog = catalog;
- this.exprAnnotator = new ExprAnnotator(catalog);
- this.preprocessor = new LogicalPlanPreprocessor(catalog, exprAnnotator);
- this.normalizer = new ExprNormalizer();
- this.evalOptimizer = new EvalTreeOptimizer();
- }
-
- public static class PlanContext {
- QueryContext queryContext;
- LogicalPlan plan;
- QueryBlock queryBlock;
- EvalTreeOptimizer evalOptimizer;
- boolean debugOrUnitTests;
-
- public PlanContext(QueryContext context, LogicalPlan plan, QueryBlock block, EvalTreeOptimizer evalOptimizer,
- boolean debugOrUnitTests) {
- this.queryContext = context;
- this.plan = plan;
- this.queryBlock = block;
- this.evalOptimizer = evalOptimizer;
- this.debugOrUnitTests = debugOrUnitTests;
- }
-
- public PlanContext(PlanContext context, QueryBlock block) {
- this.queryContext = context.queryContext;
- this.plan = context.plan;
- this.queryBlock = block;
- this.evalOptimizer = context.evalOptimizer;
- this.debugOrUnitTests = context.debugOrUnitTests;
- }
-
- public QueryBlock getQueryBlock() {
- return queryBlock;
- }
-
- public String toString() {
- return "block=" + queryBlock.getName() + ", relNum=" + queryBlock.getRelations().size() + ", "+
- queryBlock.namedExprsMgr.toString();
- }
- }
-
- /**
- * This generates a logical plan.
- *
- * @param expr A relational algebraic expression for a query.
- * @return A logical plan
- */
- public LogicalPlan createPlan(QueryContext context, Expr expr) throws PlanningException {
- return createPlan(context, expr, false);
- }
-
- @VisibleForTesting
- public LogicalPlan createPlan(QueryContext queryContext, Expr expr, boolean debug) throws PlanningException {
-
- LogicalPlan plan = new LogicalPlan(this);
-
- QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
- PlanContext context = new PlanContext(queryContext, plan, rootBlock, evalOptimizer, debug);
- preprocessor.visit(context, new Stack<Expr>(), expr);
- plan.resetGeneratedId();
- LogicalNode topMostNode = this.visit(context, new Stack<Expr>(), expr);
-
- // Add Root Node
- LogicalRootNode root = plan.createNode(LogicalRootNode.class);
- root.setInSchema(topMostNode.getOutSchema());
- root.setChild(topMostNode);
- root.setOutSchema(topMostNode.getOutSchema());
- plan.getRootBlock().setRoot(root);
-
- return plan;
- }
-
- public ExprAnnotator getExprAnnotator() {
- return this.exprAnnotator;
- }
-
- public void preHook(PlanContext context, Stack<Expr> stack, Expr expr) throws PlanningException {
- context.queryBlock.updateCurrentNode(expr);
- }
-
- public LogicalNode postHook(PlanContext context, Stack<Expr> stack, Expr expr, LogicalNode current)
- throws PlanningException {
-
-
- // Some generated logical nodes (e.g., implicit aggregation) without exprs will pass NULL as a expr parameter.
- // We should skip them.
- if (expr != null) {
- // A relation list including a single ScanNode will return a ScanNode instance that already passed postHook.
- // So, it skips the already-visited ScanNode instance.
- if (expr.getType() == OpType.RelationList && current.getType() == NodeType.SCAN) {
- return current;
- }
- }
-
- QueryBlock queryBlock = context.queryBlock;
- queryBlock.updateLatestNode(current);
-
- // if this node is the topmost
- if (stack.size() == 0) {
- queryBlock.setRoot(current);
- }
-
- if (!stack.empty()) {
- queryBlock.updateCurrentNode(stack.peek());
- }
- return current;
- }
-
- public LogicalNode visitExplain(PlanContext ctx, Stack<Expr> stack, Explain expr) throws PlanningException {
- ctx.plan.setExplain();
- return visit(ctx, stack, expr.getChild());
- }
-
- /*===============================================================================================
- Data Manupulation Language (DML) SECTION
- ===============================================================================================*/
-
-
- /*===============================================================================================
- PROJECTION SECTION
- ===============================================================================================*/
- @Override
- public LogicalNode visitProjection(PlanContext context, Stack<Expr> stack, Projection projection)
- throws PlanningException {
-
-
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- // If a non-from statement is given
- if (!projection.hasChild()) {
- return buildPlanForNoneFromStatement(context, stack, projection);
- }
-
- String [] referenceNames;
- // in prephase, insert all target list into NamedExprManagers.
- // Then it gets reference names, each of which points an expression in target list.
- Pair<String [], ExprNormalizer.WindowSpecReferences []> referencesPair = doProjectionPrephase(context, projection);
- referenceNames = referencesPair.getFirst();
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(projection);
- LogicalNode child = visit(context, stack, projection.getChild());
-
- // check if it is implicit aggregation. If so, it inserts group-by node to its child.
- if (block.isAggregationRequired()) {
- child = insertGroupbyNode(context, child, stack);
- }
-
- if (block.hasWindowSpecs()) {
- LogicalNode windowAggNode =
- insertWindowAggNode(context, child, stack, referenceNames, referencesPair.getSecond());
- if (windowAggNode != null) {
- child = windowAggNode;
- }
- }
- stack.pop();
- ////////////////////////////////////////////////////////
-
- ProjectionNode projectionNode;
- Target [] targets;
- targets = buildTargets(context, referenceNames);
-
- // Set ProjectionNode
- projectionNode = context.queryBlock.getNodeFromExpr(projection);
- projectionNode.setInSchema(child.getOutSchema());
- projectionNode.setTargets(targets);
- projectionNode.setChild(child);
-
- if (projection.isDistinct() && block.hasNode(NodeType.GROUP_BY)) {
- throw new VerifyException("Cannot support grouping and distinct at the same time yet");
- } else {
- if (projection.isDistinct()) {
- insertDistinctOperator(context, projectionNode, child, stack);
- }
- }
-
- // It's for debugging and unit tests purpose.
- // It sets raw targets, all of them are raw expressions instead of references.
- if (context.debugOrUnitTests) {
- setRawTargets(context, targets, referenceNames, projection);
- }
-
- verifyProjectedFields(block, projectionNode);
- return projectionNode;
- }
-
- private void setRawTargets(PlanContext context, Target[] targets, String[] referenceNames,
- Projection projection) throws PlanningException {
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- // It's for debugging or unit tests.
- Target [] rawTargets = new Target[projection.getNamedExprs().length];
- for (int i = 0; i < projection.getNamedExprs().length; i++) {
- NamedExpr namedExpr = projection.getNamedExprs()[i];
- EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
- NameResolvingMode.RELS_AND_SUBEXPRS);
- rawTargets[i] = new Target(evalNode, referenceNames[i]);
- }
- // it's for debugging or unit testing
- block.setRawTargets(rawTargets);
- }
-
- private void insertDistinctOperator(PlanContext context, ProjectionNode projectionNode, LogicalNode child,
- Stack<Expr> stack) throws PlanningException {
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- Schema outSchema = projectionNode.getOutSchema();
- GroupbyNode dupRemoval = context.plan.createNode(GroupbyNode.class);
- dupRemoval.setChild(child);
- dupRemoval.setInSchema(projectionNode.getInSchema());
- dupRemoval.setTargets(PlannerUtil.schemaToTargets(outSchema));
- dupRemoval.setGroupingColumns(outSchema.toArray());
-
- block.registerNode(dupRemoval);
- postHook(context, stack, null, dupRemoval);
-
- projectionNode.setChild(dupRemoval);
- projectionNode.setInSchema(dupRemoval.getOutSchema());
- }
-
- private Pair<String [], ExprNormalizer.WindowSpecReferences []> doProjectionPrephase(PlanContext context,
- Projection projection)
- throws PlanningException {
-
- QueryBlock block = context.queryBlock;
-
- int finalTargetNum = projection.size();
- String [] referenceNames = new String[finalTargetNum];
- ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[finalTargetNum];
-
- List<ExprNormalizer.WindowSpecReferences> windowSpecReferencesList = TUtil.newList();
-
- List<Integer> targetsIds = normalize(context, projection, normalizedExprList, new Matcher() {
- @Override
- public boolean isMatch(Expr expr) {
- return ExprFinder.finds(expr, OpType.WindowFunction).size() == 0;
- }
- });
-
- // Note: Why separate normalization and add(Named)Expr?
- //
- // ExprNormalizer internally makes use of the named exprs in NamedExprsManager.
- // If we don't separate normalization work and addExprWithName, addExprWithName will find named exprs evaluated
- // the same logical node. It will cause impossible evaluation in physical executors.
- addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds);
-
- targetsIds = normalize(context, projection, normalizedExprList, new Matcher() {
- @Override
- public boolean isMatch(Expr expr) {
- return ExprFinder.finds(expr, OpType.WindowFunction).size() > 0;
- }
- });
- addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds);
-
- return new Pair<String[], ExprNormalizer.WindowSpecReferences []>(referenceNames,
- windowSpecReferencesList.toArray(new ExprNormalizer.WindowSpecReferences[windowSpecReferencesList.size()]));
- }
-
- private interface Matcher {
- public boolean isMatch(Expr expr);
- }
-
- public List<Integer> normalize(PlanContext context, Projection projection, ExprNormalizedResult [] normalizedExprList,
- Matcher matcher) throws PlanningException {
- List<Integer> targetIds = new ArrayList<Integer>();
- for (int i = 0; i < projection.size(); i++) {
- NamedExpr namedExpr = projection.getNamedExprs()[i];
-
- if (PlannerUtil.existsAggregationFunction(namedExpr)) {
- context.queryBlock.setAggregationRequire();
- }
-
- if (matcher.isMatch(namedExpr.getExpr())) {
- // If a value is constant value, it adds the constant value with a proper name to the constant map
- // of the current block
- if (!namedExpr.hasAlias() && OpType.isLiteralType(namedExpr.getExpr().getType())) {
- String generatedName = context.plan.generateUniqueColumnName(namedExpr.getExpr());
- ConstEval constEval = (ConstEval) exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
- NameResolvingMode.RELS_ONLY);
- context.getQueryBlock().addConstReference(generatedName, namedExpr.getExpr(), constEval);
- normalizedExprList[i] = new ExprNormalizedResult(context, false);
- normalizedExprList[i].baseExpr = new ColumnReferenceExpr(generatedName);
-
- } else {
- // dissect an expression into multiple parts (at most dissected into three parts)
- normalizedExprList[i] = normalizer.normalize(context, namedExpr.getExpr());
- }
- targetIds.add(i);
- }
- }
-
- return targetIds;
- }
-
- private void addNamedExprs(QueryBlock block, String [] referenceNames, ExprNormalizedResult [] normalizedExprList,
- List<ExprNormalizer.WindowSpecReferences> windowSpecReferencesList, Projection projection,
- List<Integer> targetIds) throws PlanningException {
- for (int i : targetIds) {
- NamedExpr namedExpr = projection.getNamedExprs()[i];
- // Get all projecting references
- if (namedExpr.hasAlias()) {
- NamedExpr aliasedExpr = new NamedExpr(normalizedExprList[i].baseExpr, namedExpr.getAlias());
- referenceNames[i] = block.namedExprsMgr.addNamedExpr(aliasedExpr);
- } else {
- referenceNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
- }
-
- // Add sub-expressions (i.e., aggregation part and scalar part) from dissected parts.
- block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
- block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
- block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].windowAggExprs);
-
- windowSpecReferencesList.addAll(normalizedExprList[i].windowSpecs);
- }
- }
-
- /**
- * It builds non-from statement (only expressions) like '<code>SELECT 1+3 as plus</code>'.
- */
- private EvalExprNode buildPlanForNoneFromStatement(PlanContext context, Stack<Expr> stack, Projection projection)
- throws PlanningException {
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- int finalTargetNum = projection.getNamedExprs().length;
- Target [] targets = new Target[finalTargetNum];
-
- for (int i = 0; i < targets.length; i++) {
- NamedExpr namedExpr = projection.getNamedExprs()[i];
- EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.RELS_ONLY);
- if (namedExpr.hasAlias()) {
- targets[i] = new Target(evalNode, namedExpr.getAlias());
- } else {
- targets[i] = new Target(evalNode, context.plan.generateUniqueColumnName(namedExpr.getExpr()));
- }
- }
- EvalExprNode evalExprNode = context.queryBlock.getNodeFromExpr(projection);
- evalExprNode.setTargets(targets);
- evalExprNode.setOutSchema(PlannerUtil.targetToSchema(targets));
- // it's for debugging or unit testing
- block.setRawTargets(targets);
- return evalExprNode;
- }
-
- private Target [] buildTargets(PlanContext context, String[] referenceNames)
- throws PlanningException {
- QueryBlock block = context.queryBlock;
-
- Target [] targets = new Target[referenceNames.length];
-
- for (int i = 0; i < referenceNames.length; i++) {
- String refName = referenceNames[i];
- if (block.isConstReference(refName)) {
- targets[i] = new Target(block.getConstByReference(refName), refName);
- } else if (block.namedExprsMgr.isEvaluated(refName)) {
- targets[i] = block.namedExprsMgr.getTarget(refName);
- } else {
- NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(refName);
- EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
- NameResolvingMode.RELS_AND_SUBEXPRS);
- block.namedExprsMgr.markAsEvaluated(refName, evalNode);
- targets[i] = new Target(evalNode, refName);
- }
- }
- return targets;
- }
-
- /**
- * It checks if all targets of Projectable plan node can be evaluated from the child node.
- * It can avoid potential errors which possibly occur in physical operators.
- *
- * @param block QueryBlock which includes the Projectable node
- * @param projectable Projectable node to be valid
- * @throws PlanningException
- */
- public static void verifyProjectedFields(QueryBlock block, Projectable projectable) throws PlanningException {
- if (projectable instanceof GroupbyNode) {
- GroupbyNode groupbyNode = (GroupbyNode) projectable;
-
- if (!groupbyNode.isEmptyGrouping()) { // it should be targets instead of
- int groupingKeyNum = groupbyNode.getGroupingColumns().length;
-
- for (int i = 0; i < groupingKeyNum; i++) {
- Target target = groupbyNode.getTargets()[i];
- if (groupbyNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD) {
- FieldEval grpKeyEvalNode = target.getEvalTree();
- if (!groupbyNode.getInSchema().contains(grpKeyEvalNode.getColumnRef())) {
- throwCannotEvaluateException(projectable, grpKeyEvalNode.getName());
- }
- }
- }
- }
-
- if (groupbyNode.hasAggFunctions()) {
- verifyIfEvalNodesCanBeEvaluated(projectable, groupbyNode.getAggFunctions());
- }
-
- } else if (projectable instanceof WindowAggNode) {
- WindowAggNode windowAggNode = (WindowAggNode) projectable;
-
- if (windowAggNode.hasPartitionKeys()) {
- verifyIfColumnCanBeEvaluated(projectable.getInSchema(), projectable, windowAggNode.getPartitionKeys());
- }
-
- if (windowAggNode.hasAggFunctions()) {
- verifyIfEvalNodesCanBeEvaluated(projectable, windowAggNode.getWindowFunctions());
- }
-
- if (windowAggNode.hasSortSpecs()) {
- Column [] sortKeys = PlannerUtil.sortSpecsToSchema(windowAggNode.getSortSpecs()).toArray();
- verifyIfColumnCanBeEvaluated(projectable.getInSchema(), projectable, sortKeys);
- }
-
- // verify targets except for function slots
- for (int i = 0; i < windowAggNode.getTargets().length - windowAggNode.getWindowFunctions().length; i++) {
- Target target = windowAggNode.getTargets()[i];
- Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
- for (Column c : columns) {
- if (!windowAggNode.getInSchema().contains(c)) {
- throwCannotEvaluateException(projectable, c.getQualifiedName());
- }
- }
- }
-
- } else if (projectable instanceof RelationNode) {
- RelationNode relationNode = (RelationNode) projectable;
- verifyIfTargetsCanBeEvaluated(relationNode.getTableSchema(), (Projectable) relationNode);
-
- } else {
- verifyIfTargetsCanBeEvaluated(projectable.getInSchema(), projectable);
- }
- }
-
- public static void verifyIfEvalNodesCanBeEvaluated(Projectable projectable, EvalNode[] evalNodes)
- throws PlanningException {
- for (EvalNode e : evalNodes) {
- Set<Column> columns = EvalTreeUtil.findUniqueColumns(e);
- for (Column c : columns) {
- if (!projectable.getInSchema().contains(c)) {
- throwCannotEvaluateException(projectable, c.getQualifiedName());
- }
- }
- }
- }
-
- public static void verifyIfTargetsCanBeEvaluated(Schema baseSchema, Projectable projectable)
- throws PlanningException {
- for (Target target : projectable.getTargets()) {
- Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
- for (Column c : columns) {
- if (!baseSchema.contains(c)) {
- throwCannotEvaluateException(projectable, c.getQualifiedName());
- }
- }
- }
- }
-
- public static void verifyIfColumnCanBeEvaluated(Schema baseSchema, Projectable projectable, Column [] columns)
- throws PlanningException {
- for (Column c : columns) {
- if (!baseSchema.contains(c)) {
- throwCannotEvaluateException(projectable, c.getQualifiedName());
- }
- }
- }
-
- public static void throwCannotEvaluateException(Projectable projectable, String columnName) throws PlanningException {
- if (projectable instanceof UnaryNode && ((UnaryNode) projectable).getChild().getType() == NodeType.GROUP_BY) {
- throw new PlanningException(columnName
- + " must appear in the GROUP BY clause or be used in an aggregate function at node ("
- + projectable.getPID() + ")");
- } else {
- throw new PlanningException(String.format("Cannot evaluate the field \"%s\" at node (%d)",
- columnName, projectable.getPID()));
- }
- }
-
- private LogicalNode insertWindowAggNode(PlanContext context, LogicalNode child, Stack<Expr> stack,
- String [] referenceNames,
- ExprNormalizer.WindowSpecReferences [] windowSpecReferenceses)
- throws PlanningException {
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
- WindowAggNode windowAggNode = context.plan.createNode(WindowAggNode.class);
- if (child.getType() == NodeType.LIMIT) {
- LimitNode limitNode = (LimitNode) child;
- windowAggNode.setChild(limitNode.getChild());
- windowAggNode.setInSchema(limitNode.getChild().getOutSchema());
- limitNode.setChild(windowAggNode);
- } else if (child.getType() == NodeType.SORT) {
- SortNode sortNode = (SortNode) child;
- windowAggNode.setChild(sortNode.getChild());
- windowAggNode.setInSchema(sortNode.getChild().getOutSchema());
- sortNode.setChild(windowAggNode);
- } else {
- windowAggNode.setChild(child);
- windowAggNode.setInSchema(child.getOutSchema());
- }
-
- List<String> winFuncRefs = new ArrayList<String>();
- List<WindowFunctionEval> winFuncs = new ArrayList<WindowFunctionEval>();
- List<WindowSpec> rawWindowSpecs = Lists.newArrayList();
- for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
- NamedExpr rawTarget = it.next();
- try {
- EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
- NameResolvingMode.SUBEXPRS_AND_RELS);
- if (evalNode.getType() == EvalType.WINDOW_FUNCTION) {
- winFuncRefs.add(rawTarget.getAlias());
- winFuncs.add((WindowFunctionEval) evalNode);
- block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
-
- // TODO - Later, we also consider the possibility that a window function contains only a window name.
- rawWindowSpecs.add(((WindowFunctionExpr) (rawTarget.getExpr())).getWindowSpec());
- }
- } catch (VerifyException ve) {
- }
- }
-
- // we only consider one window definition.
- if (windowSpecReferenceses[0].hasPartitionKeys()) {
- Column [] partitionKeyColumns = new Column[windowSpecReferenceses[0].getPartitionKeys().length];
- int i = 0;
- for (String partitionKey : windowSpecReferenceses[0].getPartitionKeys()) {
- if (block.namedExprsMgr.isEvaluated(partitionKey)) {
- partitionKeyColumns[i++] = block.namedExprsMgr.getTarget(partitionKey).getNamedColumn();
- } else {
- throw new PlanningException("Each grouping column expression must be a scalar expression.");
- }
- }
- windowAggNode.setPartitionKeys(partitionKeyColumns);
- }
-
- SortSpec [][] sortGroups = new SortSpec[rawWindowSpecs.size()][];
-
- for (int winSpecIdx = 0; winSpecIdx < rawWindowSpecs.size(); winSpecIdx++) {
- WindowSpec spec = rawWindowSpecs.get(winSpecIdx);
- if (spec.hasOrderBy()) {
- Sort.SortSpec [] sortSpecs = spec.getSortSpecs();
- int sortNum = sortSpecs.length;
- String [] sortKeyRefNames = windowSpecReferenceses[winSpecIdx].getOrderKeys();
- SortSpec [] annotatedSortSpecs = new SortSpec[sortNum];
-
- Column column;
- for (int i = 0; i < sortNum; i++) {
- if (block.namedExprsMgr.isEvaluated(sortKeyRefNames[i])) {
- column = block.namedExprsMgr.getTarget(sortKeyRefNames[i]).getNamedColumn();
- } else {
- throw new IllegalStateException("Unexpected State: " + TUtil.arrayToString(sortSpecs));
- }
- annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(), sortSpecs[i].isNullFirst());
- }
-
- sortGroups[winSpecIdx] = annotatedSortSpecs;
- } else {
- sortGroups[winSpecIdx] = null;
- }
- }
-
- for (int i = 0; i < winFuncRefs.size(); i++) {
- WindowFunctionEval winFunc = winFuncs.get(i);
- if (sortGroups[i] != null) {
- winFunc.setSortSpecs(sortGroups[i]);
- }
- }
-
- Target [] targets = new Target[referenceNames.length];
- List<Integer> windowFuncIndices = Lists.newArrayList();
- Projection projection = (Projection) stack.peek();
- int windowFuncIdx = 0;
- for (NamedExpr expr : projection.getNamedExprs()) {
- if (expr.getExpr().getType() == OpType.WindowFunction) {
- windowFuncIndices.add(windowFuncIdx);
- }
- windowFuncIdx++;
- }
- windowAggNode.setWindowFunctions(winFuncs.toArray(new WindowFunctionEval[winFuncs.size()]));
-
- int targetIdx = 0;
- for (int i = 0; i < referenceNames.length ; i++) {
- if (!windowFuncIndices.contains(i)) {
- if (block.isConstReference(referenceNames[i])) {
- targets[targetIdx++] = new Target(block.getConstByReference(referenceNames[i]), referenceNames[i]);
- } else {
- targets[targetIdx++] = block.namedExprsMgr.getTarget(referenceNames[i]);
- }
- }
- }
- for (int i = 0; i < winFuncRefs.size(); i++) {
- targets[targetIdx++] = block.namedExprsMgr.getTarget(winFuncRefs.get(i));
- }
- windowAggNode.setTargets(targets);
- verifyProjectedFields(block, windowAggNode);
-
- block.registerNode(windowAggNode);
- postHook(context, stack, null, windowAggNode);
-
- if (child.getType() == NodeType.LIMIT) {
- LimitNode limitNode = (LimitNode) child;
- limitNode.setInSchema(windowAggNode.getOutSchema());
- limitNode.setOutSchema(windowAggNode.getOutSchema());
- return null;
- } else if (child.getType() == NodeType.SORT) {
- SortNode sortNode = (SortNode) child;
- sortNode.setInSchema(windowAggNode.getOutSchema());
- sortNode.setOutSchema(windowAggNode.getOutSchema());
- return null;
- } else {
- return windowAggNode;
- }
- }
-
- /**
- * Insert a group-by operator before a sort or a projection operator.
- * It is used only when a group-by clause is not given.
- */
- private LogicalNode insertGroupbyNode(PlanContext context, LogicalNode child, Stack<Expr> stack)
- throws PlanningException {
-
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
- GroupbyNode groupbyNode = context.plan.createNode(GroupbyNode.class);
- groupbyNode.setChild(child);
- groupbyNode.setInSchema(child.getOutSchema());
-
- groupbyNode.setGroupingColumns(new Column[] {});
-
- Set<String> aggEvalNames = new LinkedHashSet<String>();
- Set<AggregationFunctionCallEval> aggEvals = new LinkedHashSet<AggregationFunctionCallEval>();
- boolean includeDistinctFunction = false;
- for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
- NamedExpr rawTarget = it.next();
- try {
- // check if at least distinct aggregation function
- includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
- EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
- NameResolvingMode.SUBEXPRS_AND_RELS);
- if (evalNode.getType() == EvalType.AGG_FUNCTION) {
- aggEvalNames.add(rawTarget.getAlias());
- aggEvals.add((AggregationFunctionCallEval) evalNode);
- block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
- }
- } catch (VerifyException ve) {
- }
- }
-
- groupbyNode.setDistinct(includeDistinctFunction);
- groupbyNode.setAggFunctions(aggEvals.toArray(new AggregationFunctionCallEval[aggEvals.size()]));
- Target [] targets = ProjectionPushDownRule.buildGroupByTarget(groupbyNode, null,
- aggEvalNames.toArray(new String[aggEvalNames.size()]));
- groupbyNode.setTargets(targets);
-
- // this inserted group-by node doesn't pass through preprocessor. So manually added.
- block.registerNode(groupbyNode);
- postHook(context, stack, null, groupbyNode);
-
- verifyProjectedFields(block, groupbyNode);
- return groupbyNode;
- }
-
- /*===============================================================================================
- SORT SECTION
- ===============================================================================================*/
- @Override
- public LimitNode visitLimit(PlanContext context, Stack<Expr> stack, Limit limit) throws PlanningException {
- QueryBlock block = context.queryBlock;
-
- EvalNode firstFetNum;
- LogicalNode child;
- if (limit.getFetchFirstNum().getType() == OpType.Literal) {
- firstFetNum = exprAnnotator.createEvalNode(context, limit.getFetchFirstNum(),
- NameResolvingMode.RELS_ONLY);
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(limit);
- child = visit(context, stack, limit.getChild());
- stack.pop();
- ////////////////////////////////////////////////////////
- } else {
- ExprNormalizedResult normalizedResult = normalizer.normalize(context, limit.getFetchFirstNum());
- String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
- block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
- block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(limit);
- child = visit(context, stack, limit.getChild());
- stack.pop();
- ////////////////////////////////////////////////////////
-
- if (block.namedExprsMgr.isEvaluated(referName)) {
- firstFetNum = block.namedExprsMgr.getTarget(referName).getEvalTree();
- } else {
- NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
- firstFetNum = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.SUBEXPRS_AND_RELS);
- block.namedExprsMgr.markAsEvaluated(referName, firstFetNum);
- }
- }
- LimitNode limitNode = block.getNodeFromExpr(limit);
- limitNode.setChild(child);
- limitNode.setInSchema(child.getOutSchema());
- limitNode.setOutSchema(child.getOutSchema());
-
- limitNode.setFetchFirst(firstFetNum.eval(null, null).asInt8());
-
- return limitNode;
- }
-
- @Override
- public LogicalNode visitSort(PlanContext context, Stack<Expr> stack, Sort sort) throws PlanningException {
- QueryBlock block = context.queryBlock;
-
- int sortKeyNum = sort.getSortSpecs().length;
- Sort.SortSpec[] sortSpecs = sort.getSortSpecs();
- String [] referNames = new String[sortKeyNum];
-
- ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[sortKeyNum];
- for (int i = 0; i < sortKeyNum; i++) {
- normalizedExprList[i] = normalizer.normalize(context, sortSpecs[i].getKey());
- }
- for (int i = 0; i < sortKeyNum; i++) {
- referNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
- block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
- block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
- }
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(sort);
- LogicalNode child = visit(context, stack, sort.getChild());
- if (block.isAggregationRequired()) {
- child = insertGroupbyNode(context, child, stack);
- }
- stack.pop();
- ////////////////////////////////////////////////////////
-
- SortNode sortNode = block.getNodeFromExpr(sort);
- sortNode.setChild(child);
- sortNode.setInSchema(child.getOutSchema());
- sortNode.setOutSchema(child.getOutSchema());
-
-
- // Building sort keys
- Column column;
- List<SortSpec> annotatedSortSpecs = Lists.newArrayList();
- for (int i = 0; i < sortKeyNum; i++) {
- String refName = referNames[i];
- if (block.isConstReference(refName)) {
- continue;
- } else if (block.namedExprsMgr.isEvaluated(refName)) {
- column = block.namedExprsMgr.getTarget(refName).getNamedColumn();
- } else {
- throw new IllegalStateException("Unexpected State: " + TUtil.arrayToString(sortSpecs));
- }
- annotatedSortSpecs.add(new SortSpec(column, sortSpecs[i].isAscending(), sortSpecs[i].isNullFirst()));
- }
-
- if (annotatedSortSpecs.size() == 0) {
- return child;
- } else {
- sortNode.setSortSpecs(annotatedSortSpecs.toArray(new SortSpec[annotatedSortSpecs.size()]));
- return sortNode;
- }
- }
-
- /*===============================================================================================
- GROUP BY SECTION
- ===============================================================================================*/
-
- @Override
- public LogicalNode visitHaving(PlanContext context, Stack<Expr> stack, Having expr) throws PlanningException {
- QueryBlock block = context.queryBlock;
-
- ExprNormalizedResult normalizedResult = normalizer.normalize(context, expr.getQual());
- String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
- block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
- block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(expr);
- LogicalNode child = visit(context, stack, expr.getChild());
- stack.pop();
- ////////////////////////////////////////////////////////
-
- HavingNode having = context.queryBlock.getNodeFromExpr(expr);
- having.setChild(child);
- having.setInSchema(child.getOutSchema());
- having.setOutSchema(child.getOutSchema());
-
- EvalNode havingCondition;
- if (block.namedExprsMgr.isEvaluated(referName)) {
- havingCondition = block.namedExprsMgr.getTarget(referName).getEvalTree();
- } else {
- NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
- havingCondition = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
- NameResolvingMode.SUBEXPRS_AND_RELS);
- block.namedExprsMgr.markAsEvaluated(referName, havingCondition);
- }
-
- // set having condition
- having.setQual(havingCondition);
-
- return having;
- }
-
- @Override
- public LogicalNode visitGroupBy(PlanContext context, Stack<Expr> stack, Aggregation aggregation)
- throws PlanningException {
-
- // Initialization Phase:
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- // Normalize grouping keys and add normalized grouping keys to NamedExprManager
- int groupingKeyNum = aggregation.getGroupSet()[0].getGroupingSets().length;
- ExprNormalizedResult [] normalizedResults = new ExprNormalizedResult[groupingKeyNum];
- for (int i = 0; i < groupingKeyNum; i++) {
- Expr groupingKey = aggregation.getGroupSet()[0].getGroupingSets()[i];
- normalizedResults[i] = normalizer.normalize(context, groupingKey);
- }
-
- String [] groupingKeyRefNames = new String[groupingKeyNum];
- for (int i = 0; i < groupingKeyNum; i++) {
- groupingKeyRefNames[i] = block.namedExprsMgr.addExpr(normalizedResults[i].baseExpr);
- block.namedExprsMgr.addNamedExprArray(normalizedResults[i].aggExprs);
- block.namedExprsMgr.addNamedExprArray(normalizedResults[i].scalarExprs);
- }
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(aggregation);
- LogicalNode child = visit(context, stack, aggregation.getChild());
- stack.pop();
- ////////////////////////////////////////////////////////
- GroupbyNode groupingNode = context.queryBlock.getNodeFromExpr(aggregation);
- groupingNode.setChild(child);
- groupingNode.setInSchema(child.getOutSchema());
-
- // Set grouping sets
- List<Column> groupingColumns = Lists.newArrayList();
- for (int i = 0; i < groupingKeyRefNames.length; i++) {
- String refName = groupingKeyRefNames[i];
- if (context.getQueryBlock().isConstReference(refName)) {
- continue;
- } else if (block.namedExprsMgr.isEvaluated(groupingKeyRefNames[i])) {
- groupingColumns.add(block.namedExprsMgr.getTarget(groupingKeyRefNames[i]).getNamedColumn());
- } else {
- throw new PlanningException("Each grouping column expression must be a scalar expression.");
- }
- }
-
- int effectiveGroupingKeyNum = groupingColumns.size();
- groupingNode.setGroupingColumns(groupingColumns.toArray(new Column[effectiveGroupingKeyNum]));
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
-
- // create EvalNodes and check if each EvalNode can be evaluated here.
- List<String> aggEvalNames = TUtil.newList();
- List<AggregationFunctionCallEval> aggEvalNodes = TUtil.newList();
- boolean includeDistinctFunction = false;
- for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); iterator.hasNext();) {
- NamedExpr namedExpr = iterator.next();
- try {
- includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(namedExpr.getExpr());
- EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
- NameResolvingMode.SUBEXPRS_AND_RELS);
- if (evalNode.getType() == EvalType.AGG_FUNCTION) {
- block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
- aggEvalNames.add(namedExpr.getAlias());
- aggEvalNodes.add((AggregationFunctionCallEval) evalNode);
- }
- } catch (VerifyException ve) {
- }
- }
- // if there is at least one distinct aggregation function
- groupingNode.setDistinct(includeDistinctFunction);
- groupingNode.setAggFunctions(aggEvalNodes.toArray(new AggregationFunctionCallEval[aggEvalNodes.size()]));
-
- Target [] targets = new Target[effectiveGroupingKeyNum + aggEvalNames.size()];
-
- // In target, grouping columns will be followed by aggregation evals.
- //
- // col1, col2, col3, sum(..), agv(..)
- // ^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
- // grouping keys aggregation evals
-
- // Build grouping keys
- for (int i = 0; i < effectiveGroupingKeyNum; i++) {
- Target target = block.namedExprsMgr.getTarget(groupingNode.getGroupingColumns()[i].getQualifiedName());
- targets[i] = target;
- }
-
- for (int i = 0, targetIdx = effectiveGroupingKeyNum; i < aggEvalNodes.size(); i++, targetIdx++) {
- targets[targetIdx] = block.namedExprsMgr.getTarget(aggEvalNames.get(i));
- }
-
- groupingNode.setTargets(targets);
- block.unsetAggregationRequire();
-
- verifyProjectedFields(block, groupingNode);
- return groupingNode;
- }
-
- private static final Column[] ALL= Lists.newArrayList().toArray(new Column[0]);
-
- public static List<Column[]> generateCuboids(Column[] columns) {
- int numCuboids = (int) Math.pow(2, columns.length);
- int maxBits = columns.length;
-
- List<Column[]> cube = Lists.newArrayList();
- List<Column> cuboidCols;
-
- cube.add(ALL);
- for (int cuboidId = 1; cuboidId < numCuboids; cuboidId++) {
- cuboidCols = Lists.newArrayList();
- for (int j = 0; j < maxBits; j++) {
- int bit = 1 << j;
- if ((cuboidId & bit) == bit) {
- cuboidCols.add(columns[j]);
- }
- }
- cube.add(cuboidCols.toArray(new Column[cuboidCols.size()]));
- }
- return cube;
- }
-
- @Override
- public SelectionNode visitFilter(PlanContext context, Stack<Expr> stack, Selection selection)
- throws PlanningException {
- QueryBlock block = context.queryBlock;
-
- ExprNormalizedResult normalizedResult = normalizer.normalize(context, selection.getQual());
- block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
- if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
- throw new VerifyException("Filter condition cannot include aggregation function");
- }
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(selection);
- LogicalNode child = visit(context, stack, selection.getChild());
- stack.pop();
- ////////////////////////////////////////////////////////
-
- SelectionNode selectionNode = context.queryBlock.getNodeFromExpr(selection);
- selectionNode.setChild(child);
- selectionNode.setInSchema(child.getOutSchema());
- selectionNode.setOutSchema(child.getOutSchema());
-
- // Create EvalNode for a search condition.
- EvalNode searchCondition = exprAnnotator.createEvalNode(context, selection.getQual(),
- NameResolvingMode.RELS_AND_SUBEXPRS);
- EvalNode simplified = context.evalOptimizer.optimize(context, searchCondition);
- // set selection condition
- selectionNode.setQual(simplified);
-
- return selectionNode;
- }
-
- /*===============================================================================================
- JOIN SECTION
- ===============================================================================================*/
-
- @Override
- public LogicalNode visitJoin(PlanContext context, Stack<Expr> stack, Join join)
- throws PlanningException {
- // Phase 1: Init
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- if (join.hasQual()) {
- ExprNormalizedResult normalizedResult = normalizer.normalize(context, join.getQual(), true);
- block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
- if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
- throw new VerifyException("Filter condition cannot include aggregation function");
- }
- }
-
- ////////////////////////////////////////////////////////
- // Visit and Build Child Plan
- ////////////////////////////////////////////////////////
- stack.push(join);
- LogicalNode left = visit(context, stack, join.getLeft());
- LogicalNode right = visit(context, stack, join.getRight());
- stack.pop();
- ////////////////////////////////////////////////////////
-
- JoinNode joinNode = context.queryBlock.getNodeFromExpr(join);
- joinNode.setJoinType(join.getJoinType());
- joinNode.setLeftChild(left);
- joinNode.setRightChild(right);
-
- // Set A merged input schema
- Schema merged;
- if (join.isNatural()) {
- merged = getNaturalJoinSchema(left, right);
- } else {
- merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
- }
- joinNode.setInSchema(merged);
-
- // Create EvalNode for a search condition.
- EvalNode joinCondition = null;
- if (join.hasQual()) {
- EvalNode evalNode = exprAnnotator.createEvalNode(context, join.getQual(), NameResolvingMode.LEGACY);
- joinCondition = context.evalOptimizer.optimize(context, evalNode);
- }
-
- List<String> newlyEvaluatedExprs = getNewlyEvaluatedExprsForJoin(context, joinNode, stack);
- List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(merged));
-
- for (String newAddedExpr : newlyEvaluatedExprs) {
- targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
- }
- joinNode.setTargets(targets.toArray(new Target[targets.size()]));
-
- // Determine join conditions
- if (join.isNatural()) { // if natural join, it should have the equi-join conditions by common column names
- EvalNode njCond = getNaturalJoinCondition(joinNode);
- joinNode.setJoinQual(njCond);
- } else if (join.hasQual()) { // otherwise, the given join conditions are set
- joinNode.setJoinQual(joinCondition);
- }
-
- return joinNode;
- }
-
- private List<String> getNewlyEvaluatedExprsForJoin(PlanContext context, JoinNode joinNode, Stack<Expr> stack) {
- QueryBlock block = context.queryBlock;
-
- EvalNode evalNode;
- List<String> newlyEvaluatedExprs = TUtil.newList();
- for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
- NamedExpr namedExpr = it.next();
- try {
- evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.LEGACY);
- if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, evalNode, joinNode, stack.peek().getType() != OpType.Join)) {
- block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
- newlyEvaluatedExprs.add(namedExpr.getAlias());
- }
- } catch (VerifyException ve) {
- } catch (PlanningException e) {
- }
- }
- return newlyEvaluatedExprs;
- }
-
- private static Schema getNaturalJoinSchema(LogicalNode left, LogicalNode right) {
- Schema joinSchema = new Schema();
- Schema commons = SchemaUtil.getNaturalJoinColumns(left.getOutSchema(), right.getOutSchema());
- joinSchema.addColumns(commons);
- for (Column c : left.getOutSchema().getColumns()) {
- if (!joinSchema.contains(c.getQualifiedName())) {
- joinSchema.addColumn(c);
- }
- }
-
- for (Column c : right.getOutSchema().getColumns()) {
- if (!joinSchema.contains(c.getQualifiedName())) {
- joinSchema.addColumn(c);
- }
- }
- return joinSchema;
- }
-
- private static EvalNode getNaturalJoinCondition(JoinNode joinNode) {
- Schema leftSchema = joinNode.getLeftChild().getInSchema();
- Schema rightSchema = joinNode.getRightChild().getInSchema();
- Schema commons = SchemaUtil.getNaturalJoinColumns(leftSchema, rightSchema);
-
- EvalNode njQual = null;
- EvalNode equiQual;
- Column leftJoinKey;
- Column rightJoinKey;
-
- for (Column common : commons.getColumns()) {
- leftJoinKey = leftSchema.getColumn(common.getQualifiedName());
- rightJoinKey = rightSchema.getColumn(common.getQualifiedName());
- equiQual = new BinaryEval(EvalType.EQUAL,
- new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
- if (njQual == null) {
- njQual = equiQual;
- } else {
- njQual = new BinaryEval(EvalType.AND, njQual, equiQual);
- }
- }
-
- return njQual;
- }
-
- private LogicalNode createCartesianProduct(PlanContext context, LogicalNode left, LogicalNode right)
- throws PlanningException {
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
- JoinNode join = plan.createNode(JoinNode.class);
- join.init(JoinType.CROSS, left, right);
- join.setInSchema(merged);
-
- EvalNode evalNode;
- List<String> newlyEvaluatedExprs = TUtil.newList();
- for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
- NamedExpr namedExpr = it.next();
- try {
- evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.LEGACY);
- if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() == 0) {
- block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
- newlyEvaluatedExprs.add(namedExpr.getAlias());
- }
- } catch (VerifyException ve) {}
- }
-
- List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(merged));
- for (String newAddedExpr : newlyEvaluatedExprs) {
- targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
- }
- join.setTargets(targets.toArray(new Target[targets.size()]));
- return join;
- }
-
- @Override
- public LogicalNode visitRelationList(PlanContext context, Stack<Expr> stack, RelationList relations)
- throws PlanningException {
-
- LogicalNode current = visit(context, stack, relations.getRelations()[0]);
-
- LogicalNode left;
- LogicalNode right;
- if (relations.size() > 1) {
-
- for (int i = 1; i < relations.size(); i++) {
- left = current;
- right = visit(context, stack, relations.getRelations()[i]);
- current = createCartesianProduct(context, left, right);
- }
- }
- context.queryBlock.registerNode(current);
-
- return current;
- }
-
- @Override
- public ScanNode visitRelation(PlanContext context, Stack<Expr> stack, Relation expr)
- throws PlanningException {
- QueryBlock block = context.queryBlock;
-
- ScanNode scanNode = block.getNodeFromExpr(expr);
- updatePhysicalInfo(scanNode.getTableDesc());
-
- // Find expression which can be evaluated at this relation node.
- // Except for column references, additional expressions used in select list, where clause, order-by clauses
- // can be evaluated here. Their reference names are kept in newlyEvaluatedExprsRef.
- Set<String> newlyEvaluatedExprsReferences = new LinkedHashSet<String>();
- for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); iterator.hasNext();) {
- NamedExpr rawTarget = iterator.next();
- try {
- EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
- NameResolvingMode.RELS_ONLY);
- if (checkIfBeEvaluatedAtRelation(block, evalNode, scanNode)) {
- block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
- newlyEvaluatedExprsReferences.add(rawTarget.getAlias()); // newly added exr
- }
- } catch (VerifyException ve) {
- }
- }
-
- // Assume that each unique expr is evaluated once.
- LinkedHashSet<Target> targets = createFieldTargetsFromRelation(block, scanNode, newlyEvaluatedExprsReferences);
-
- // The fact the some expr is included in newlyEvaluatedExprsReferences means that it is already evaluated.
- // So, we get a raw expression and then creates a target.
- for (String reference : newlyEvaluatedExprsReferences) {
- NamedExpr refrer = block.namedExprsMgr.getNamedExpr(reference);
- EvalNode evalNode = exprAnnotator.createEvalNode(context, refrer.getExpr(), NameResolvingMode.RELS_ONLY);
- targets.add(new Target(evalNode, reference));
- }
-
- scanNode.setTargets(targets.toArray(new Target[targets.size()]));
-
- verifyProjectedFields(block, scanNode);
- return scanNode;
- }
-
- private static LinkedHashSet<Target> createFieldTargetsFromRelation(QueryBlock block, RelationNode relationNode,
- Set<String> newlyEvaluatedRefNames) {
- LinkedHashSet<Target> targets = Sets.newLinkedHashSet();
- for (Column column : relationNode.getTableSchema().getColumns()) {
- String aliasName = block.namedExprsMgr.checkAndGetIfAliasedColumn(column.getQualifiedName());
- if (aliasName != null) {
- targets.add(new Target(new FieldEval(column), aliasName));
- newlyEvaluatedRefNames.remove(aliasName);
- } else {
- targets.add(new Target(new FieldEval(column)));
- }
- }
- return targets;
- }
-
- private void updatePhysicalInfo(TableDesc desc) {
- if (desc.getPath() != null) {
- try {
- FileSystem fs = desc.getPath().getFileSystem(new Configuration());
- FileStatus status = fs.getFileStatus(desc.getPath());
- if (desc.getStats() != null && (status.isDirectory() || status.isFile())) {
- ContentSummary summary = fs.getContentSummary(desc.getPath());
- if (summary != null) {
- long volume = summary.getLength();
- desc.getStats().setNumBytes(volume);
- }
- }
- } catch (Throwable t) {
- LOG.warn(t);
- }
- }
- }
-
- public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<Expr> stack, TablePrimarySubQuery expr)
- throws PlanningException {
- QueryBlock block = context.queryBlock;
-
- QueryBlock childBlock = context.plan.getBlock(context.plan.getBlockNameByExpr(expr.getSubQuery()));
- PlanContext newContext = new PlanContext(context, childBlock);
- LogicalNode child = visit(newContext, new Stack<Expr>(), expr.getSubQuery());
- TableSubQueryNode subQueryNode = context.queryBlock.getNodeFromExpr(expr);
- context.plan.connectBlocks(childBlock, context.queryBlock, BlockType.TableSubQuery);
- subQueryNode.setSubQuery(child);
-
- // Add additional expressions required in upper nodes.
- Set<String> newlyEvaluatedExprs = TUtil.newHashSet();
- for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
- try {
- EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
- NameResolvingMode.RELS_ONLY);
- if (checkIfBeEvaluatedAtRelation(block, evalNode, subQueryNode)) {
- block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
- newlyEvaluatedExprs.add(rawTarget.getAlias()); // newly added exr
- }
- } catch (VerifyException ve) {
- }
- }
-
- // Assume that each unique expr is evaluated once.
- LinkedHashSet<Target> targets = createFieldTargetsFromRelation(block, subQueryNode, newlyEvaluatedExprs);
-
- for (String newAddedExpr : newlyEvaluatedExprs) {
- targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
- }
-
- subQueryNode.setTargets(targets.toArray(new Target[targets.size()]));
-
- return subQueryNode;
- }
-
- /*===============================================================================================
- SET OPERATION SECTION
- ===============================================================================================*/
-
- @Override
- public LogicalNode visitUnion(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
- throws PlanningException {
- return buildSetPlan(context, stack, setOperation);
- }
-
- @Override
- public LogicalNode visitExcept(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
- throws PlanningException {
- return buildSetPlan(context, stack, setOperation);
- }
-
- @Override
- public LogicalNode visitIntersect(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
- throws PlanningException {
- return buildSetPlan(context, stack, setOperation);
- }
-
- private LogicalNode buildSetPlan(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
- throws PlanningException {
-
- // 1. Init Phase
- LogicalPlan plan = context.plan;
- QueryBlock block = context.queryBlock;
-
- ////////////////////////////////////////////////////////
- // Visit and Build Left Child Plan
- ////////////////////////////////////////////////////////
- QueryBlock leftBlock = context.plan.getBlockByExpr(setOperation.getLeft());
- PlanContext leftContext = new PlanContext(context, leftBlock);
- stack.push(setOperation);
- LogicalNode leftChild = visit(leftContext, new Stack<Expr>(), setOperation.getLeft());
- stack.pop();
- // Connect left child and current blocks
- context.plan.connectBlocks(leftContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
-
- ////////////////////////////////////////////////////////
- // Visit and Build Right Child Plan
- ////////////////////////////////////////////////////////
- QueryBlock rightBlock = context.plan.getBlockByExpr(setOperation.getRight());
- PlanContext rightContext = new PlanContext(context, rightBlock);
- stack.push(setOperation);
- LogicalNode rightChild = visit(rightContext, new Stack<Expr>(), setOperation.getRight());
- stack.pop();
- // Connect right child and current blocks
- context.plan.connectBlocks(rightContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
-
- BinaryNode setOp;
- if (setOperation.getType() == OpType.Union) {
- setOp = block.getNodeFromExpr(setOperation);
- } else if (setOperation.getType() == OpType.Except) {
- setOp = block.getNodeFromExpr(setOperation);
- } else if (setOperation.getType() == OpType.Intersect) {
- setOp = block.getNodeFromExpr(setOperation);
- } else {
- throw new VerifyException("Invalid Type: " + setOperation.getType());
- }
- setOp.setLeftChild(leftChild);
- setOp.setRightChild(rightChild);
-
- // An union statement can be derived from two query blocks.
- // For one union statement between both relations, we can ensure that each corresponding data domain of both
- // relations are the same. However, if necessary, the schema of left query block will be used as a base schema.
- Target [] leftStrippedTargets = PlannerUtil.stripTarget(
- PlannerUtil.schemaToTargets(leftBlock.getRoot().getOutSchema()));
-
- setOp.setInSchema(leftChild.getOutSchema());
- Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
- setOp.setOutSchema(outSchema);
-
- return setOp;
- }
-
- /*===============================================================================================
- INSERT SECTION
- ===============================================================================================*/
-
- public LogicalNode visitInsert(PlanContext context, Stack<Expr> stack, Insert expr) throws PlanningException {
- stack.push(expr);
- LogicalNode subQuery = super.visitInsert(context, stack, expr);
- stack.pop();
-
- InsertNode insertNode = context.queryBlock.getNodeFromExpr(expr);
- insertNode.setOverwrite(expr.isOverwrite());
- insertNode.setSubQuery(subQuery);
-
- if (expr.hasTableName()) { // INSERT (OVERWRITE) INTO TABLE ...
- return buildInsertIntoTablePlan(context, insertNode, expr);
- } else if (expr.hasLocation()) { // INSERT (OVERWRITE) INTO LOCATION ...
- return buildInsertIntoLocationPlan(context, insertNode, expr);
- } else {
- throw new IllegalStateException("Invalid Query");
- }
- }
-
- /**
- * Builds a InsertNode with a target table.
- *
- * ex) INSERT OVERWRITE INTO TABLE ...
- * <br />
- *
- * We use the following terms, such target table, target column
- * <pre>
- * INSERT INTO [DATABASE_NAME.]TB_NAME (col1, col2) SELECT c1, c2 FROM ...
- * ^^^^^^^^^^^^^^ ^^^^^^^ ^^^^^^^^^^^^ ^^^^^^^^^^^^
- * target database target table target columns (or schema) projected columns (or schema)
- * </pre>
- */
- private InsertNode buildInsertIntoTablePlan(PlanContext context, InsertNode insertNode, Insert expr)
- throws PlanningException {
- // Get and set a target table
- String databaseName;
- String tableName;
- if (CatalogUtil.isFQTableName(expr.getTableName())) {
- databaseName = CatalogUtil.extractQualifier(expr.getTableName());
- tableName = CatalogUtil.extractSimpleName(expr.getTableName());
- } else {
- databaseName = context.queryContext.getCurrentDatabase();
- tableName = expr.getTableName();
- }
- TableDesc desc = catalog.getTableDesc(databaseName, tableName);
- insertNode.setTargetTable(desc);
-
- //
- // When we use 'INSERT (OVERWIRTE) INTO TABLE statements, there are two cases.
- //
- // First, when a user specified target columns
- // INSERT (OVERWRITE)? INTO table_name (col1 type, col2 type) SELECT ...
- //
- // Second, when a user do not specified target columns
- // INSERT (OVERWRITE)? INTO table_name SELECT ...
- //
- // In the former case is, target columns' schema and corresponding projected columns' schema
- // must be equivalent or be available to cast implicitly.
- //
- // In the later case, the target table's schema and projected column's
- // schema of select clause can be different to each other. In this case,
- // we use only a sequence of preceding columns of target table's schema
- // as target columns.
- //
- // For example, consider a target table and an 'insert into' query are give as follows:
- //
- // CREATE TABLE TB1 (col1 int, col2 int, col3 long);
- // || ||
- // INSERT OVERWRITE INTO TB1 SELECT order_key, part_num FROM ...
- //
- // In this example, only col1 and col2 are used as target columns.
-
- if (expr.hasTargetColumns()) { // when a user specified target columns
-
- if (expr.getTargetColumns().length > insertNode.getChild().getOutSchema().size()) {
- throw new PlanningException("Target columns and projected columns are mismatched to each other");
- }
-
- // See PreLogicalPlanVerifier.visitInsert.
- // It guarantees that the equivalence between the numbers of target and projected columns.
- String [] targets = expr.getTargetColumns();
- Schema targetColumns = new Schema();
- for (int i = 0; i < targets.length; i++) {
- Column targetColumn = desc.getLogicalSchema().getColumn(targets[i]);
- targetColumns.addColumn(targetColumn);
- }
- insertNode.setTargetSchema(targetColumns);
- insertNode.setOutSchema(targetColumns);
- buildProjectedInsert(context, insertNode);
-
- } else { // when a user do not specified target columns
-
- // The output schema of select clause determines the target columns.
- Schema tableSchema = desc.getLogicalSchema();
- Schema projectedSchema = insertNode.getChild().getOutSchema();
-
- Schema targetColumns = new Schema();
- for (int i = 0; i < projectedSchema.size(); i++) {
- targetColumns.addColumn(tableSchema.getColumn(i));
- }
- insertNode.setTargetSchema(targetColumns);
- buildProjectedInsert(context, insertNode);
- }
-
- if (desc.hasPartition()) {
- insertNode.setPartitionMethod(desc.getPartitionMethod());
- }
- return insertNode;
- }
-
- private void buildProjectedInsert(PlanContext context, InsertNode insertNode) {
- Schema tableSchema = insertNode.getTableSchema();
- Schema targetColumns = insertNode.getTargetSchema();
-
- LogicalNode child = insertNode.getChild();
-
- if (child.getType() == NodeType.UNION) {
- child = makeProjectionForInsertUnion(context, insertNode);
- }
-
- if (child instanceof Projectable) {
- Projectable projectionNode = (Projectable) insertNode.getChild();
-
- // Modifying projected columns by adding NULL constants
- // It is because that table appender does not support target columns to be written.
- List<Target> targets = TUtil.newList();
- for (int i = 0, j = 0; i < tableSchema.size(); i++) {
- Column column = tableSchema.getColumn(i);
-
- if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
- targets.add(projectionNode.getTargets()[j++]);
- } else {
- targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
- }
- }
- projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
-
- insertNode.setInSchema(projectionNode.getOutSchema());
- insertNode.setOutSchema(projectionNode.getOutSchema());
- insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
- } else {
- throw new RuntimeException("Wrong child node type: " + child.getType() + " for insert");
- }
- }
-
- private ProjectionNode makeProjectionForInsertUnion(PlanContext context, InsertNode insertNode) {
- LogicalNode child = insertNode.getChild();
- // add (projection - subquery) to RootBlock and create new QueryBlock for UnionNode
- TableSubQueryNode subQueryNode = context.plan.createNode(TableSubQueryNode.class);
- subQueryNode.init(context.queryBlock.getName(), child);
- subQueryNode.setTargets(PlannerUtil.schemaToTargets(subQueryNode.getOutSchema()));
-
- ProjectionNode projectionNode = context.plan.createNode(ProjectionNode.class);
- projectionNode.setChild(subQueryNode);
- projectionNode.setInSchema(subQueryNode.getInSchema());
- projectionNode.setTargets(subQueryNode.getTargets());
-
- context.queryBlock.registerNode(projectionNode);
- context.queryBlock.registerNode(subQueryNode);
-
- // add child QueryBlock to the UnionNode's QueryBlock
- UnionNode unionNode = (UnionNode)child;
- context.queryBlock.unregisterNode(unionNode);
-
- QueryBlock unionBlock = context.plan.newQueryBlock();
- unionBlock.registerNode(unionNode);
- unionBlock.setRoot(unionNode);
-
- QueryBlock leftBlock = context.plan.getBlock(unionNode.getLeftChild());
- QueryBlock rightBlock = context.plan.getBlock(unionNode.getRightChild());
-
- context.plan.disconnectBlocks(leftBlock, context.queryBlock);
- context.plan.disconnectBlocks(rightBlock, context.queryBlock);
-
- context.plan.connectBlocks(unionBlock, context.queryBlock, BlockType.TableSubQuery);
- context.plan.connectBlocks(leftBlock, unionBlock, BlockType.TableSubQuery);
- context.plan.connectBlocks(rightBlock, unionBlock, BlockType.TableSubQuery);
-
- // set InsertNode's child with ProjectionNode which is created.
- insertNode.setChild(projectionNode);
-
- return projectionNode;
- }
-
- /**
- * Build a InsertNode with a location.
- *
- * ex) INSERT OVERWRITE INTO LOCATION 'hdfs://....' ..
- */
- private InsertNode buildInsertIntoLocationPlan(PlanContext context, InsertNode insertNode, Insert expr) {
- // INSERT (OVERWRITE)? INTO LOCATION path (USING file_type (param_clause)?)? query_expression
-
- LogicalNode child = insertNode.getChild();
-
- if (child.getType() == NodeType.UNION) {
- child = makeProjectionForInsertUnion(context, insertNode);
- }
-
- Schema childSchema = child.getOutSchema();
- insertNode.setInSchema(childSchema);
- insertNode.setOutSchema(childSchema);
- insertNode.setTableSchema(childSchema);
- insertNode.setTargetLocation(new Path(expr.getLocation()));
-
- if (expr.hasStorageType()) {
- insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
- }
- if (expr.hasParams()) {
- KeyValueSet options = new KeyValueSet();
- options.putAll(expr.getParams());
- insertNode.setOptions(options);
- }
- return insertNode;
- }
-
- /*===============================================================================================
- Data Definition Language (DDL) SECTION
- ===============================================================================================*/
-
- @Override
- public LogicalNode visitCreateDatabase(PlanContext context, Stack<Expr> stack, CreateDatabase expr)
- throws PlanningException {
- CreateDatabaseNode createDatabaseNode = context.queryBlock.getNodeFromExpr(expr);
- createDatabaseNode.init(expr.getDatabaseName(), expr.isIfNotExists());
- return createDatabaseNode;
- }
-
- @Override
- public LogicalNode visitDropDatabase(PlanContext context, Stack<Expr> stack, DropDatabase expr)
- throws PlanningException {
- DropDatabaseNode dropDatabaseNode = context.queryBlock.getNodeFromExpr(expr);
- dropDatabaseNode.init(expr.getDatabaseName(), expr.isIfExists());
- return dropDatabaseNode;
- }
-
- public LogicalNode handleCreateTableLike(PlanContext context, CreateTable expr, CreateTableNode createTableNode)
- throws PlanningException {
- String parentTableName = expr.getLikeParentTableName();
-
- if (CatalogUtil.isFQTableName(parentTableName) == false) {
- parentTableName =
- CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(),
- parentTableName);
- }
- TableDesc parentTableDesc = catalog.getTableDesc(parentTableName);
- if(parentTableDesc == null)
- throw new PlanningException("Table '"+parentTableName+"' does not exist");
- PartitionMethodDesc partitionDesc = parentTableDesc.getPartitionMethod();
- createTableNode.setTableSchema(parentTableDesc.getSchema());
- createTableNode.setPartitionMethod(partitionDesc);
-
- createTableNode.setStorageType(parentTableDesc.getMeta().getStoreType());
- createTableNode.setOptions(parentTableDesc.getMeta().getOptions());
-
- createTableNode.setExternal(parentTableDesc.isExternal());
- if(parentTableDesc.isExternal()) {
- createTableNode.setPath(parentTableDesc.getPath());
- }
- return createTableNode;
- }
-
-
- @Override
- public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
- throws PlanningException {
-
- CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
- createTableNode.setIfNotExists(expr.isIfNotExists());
-
- // Set a table name to be created.
- if (CatalogUtil.isFQTableName(expr.getTableName())) {
- createTableNode.setTableName(expr.getTableName());
- } else {
- createTableNode.setTableName(
- CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), expr.getTableName()));
- }
- // This is CREATE TABLE <tablename> LIKE <parentTable>
- if(expr.getLikeParentTableName() != null)
- return handleCreateTableLike(context, expr, createTableNode);
-
- if (expr.hasStorageType()) { // If storage type (using clause) is specified
- createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
- } else { // otherwise, default type
- createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
- }
-
- // Set default storage properties to be created.
- KeyValueSet keyValueSet = StorageUtil.newPhysicalProperties(createTableNode.getStorageType());
- if (expr.hasParams()) {
- keyValueSet.putAll(expr.getParams());
- }
-
- createTableNode.setOptions(keyValueSet);
-
- if (expr.hasPartition()) {
- if (expr.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
- createTableNode.setPartitionMethod(getPartitionMethod(context, expr.getTableName(), expr.getPartitionMethod()));
- } else {
- throw new PlanningException(String.format("Not supported PartitonType: %s",
- expr.getPartitionMethod().getPartitionType()));
- }
- }
-
- if (expr.hasSubQuery()) { // CREATE TABLE .. AS SELECT
- stack.add(expr);
- LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
- stack.pop();
- createTableNode.setChild(subQuery);
- createTableNode.setInSchema(subQuery.getOutSchema());
-
- // If the table schema is defined
- // ex) CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
- if (expr.hasTableElements()) {
- createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
- createTableNode.setTableSchema(convertTableElementsSchema(expr.getTableElements()));
- } else {
- // if no table definition, the select clause's output schema will be used.
- // ex) CREATE TABLE tbl AS SELECT ...
-
- if (expr.hasPartition()) {
- PartitionMethodDesc partitionMethod = createTableNode.getPartitionMethod();
-
- Schema queryOutputSchema = subQuery.getOutSchema();
- Schema partitionExpressionSchema = partitionMethod.getExpressionSchema();
- if (partitionMethod.getPartitionType() == CatalogProtos.PartitionType.COLUMN &&
- queryOutputSchema.size() < partitionExpressionSchema.size()) {
- throw new VerifyException("Partition columns cannot be more than table columns.");
- }
- Schema tableSchema = new Schema();
- for (int i = 0; i < queryOutputSchema.size() - partitionExpressionSchema.size(); i++) {
- tableSchema.addColumn(queryOutputSchema.getColumn(i));
- }
- createTableNode.setOutSchema(tableSchema);
- createTableNode.setTableSchema(tableSchema);
- } else {
- // Convert the schema of subquery into the target table's one.
- Schema schema = new Schema(subQuery.getOutSchema());
- schema.setQualifier(createTableNode.getTableName());
- createTableNode.setOutSchema(schema);
- createTableNode.setTableSchema(schema);
- }
- }
-
- return createTableNode;
-
- } else { // if CREATE AN EMPTY TABLE
- Schema tableSchema = convertColumnsToSchema(expr.getTableElements());
- createTableNode.setTableSchema(tableSchema);
-
- if (expr.isExternal()) {
- createTableNode.setExternal(true);
- }
-
- if (expr.hasLocation()) {
- createTableNode.setPath(new Path(expr.getLocation()));
- }
-
- return createTableNode;
- }
- }
-
- private PartitionMethodDesc getPartitionMethod(PlanContext context,
- String tableName,
- CreateTable.PartitionMethodDescExpr expr) throws PlanningException {
- PartitionMethodDesc partitionMethodDesc;
-
- if(expr.getPartitionType() == PartitionType.COLUMN) {
- CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr;
- String partitionExpression = Joiner.on(',').join(partition.getColumns());
-
- partitionMethodDesc = new PartitionMethodDesc(context.queryContext.getCurrentDatabase(), tableName,
- CatalogProtos.PartitionType.COLUMN, partitionExpression, convertColumnsToSchema(partition.getColumns()));
- } else {
- throw new PlanningException(String.format("Not supported PartitonType: %s", expr.getPartitionType()));
- }
- return partitionMethodDesc;
- }
-
- /**
- * It transforms table definition elements to schema.
- *
- * @param elements to be transformed
- * @return schema transformed from table definition elements
- */
- private Schema convertColumnsToSchema(ColumnDefinition[] elements) {
- Schema schema = new Schema();
-
- for (ColumnDefinition columnDefinition: elements) {
- schema.addColumn(convertColumn(columnDefinition));
- }
-
- return schema;
- }
-
- /**
- * It transforms table definition elements to schema.
- *
- * @param elements to be transformed
- * @return schema transformed from table definition elements
- */
- private Schema convertTableElementsSchema(ColumnDefinition[] elements) {
- Schema schema = new Schema();
-
- for (ColumnDefinition columnDefinition: elements) {
- schema.addColumn(convertColumn(columnDefinition));
- }
-
- return schema;
- }
-
- private Column convertColumn(ColumnDefinition columnDefinition) {
- return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
- }
-
- public static TajoDataTypes.DataType convertDataType(DataTypeExpr dataType) {
- TajoDataTypes.Type type = TajoDataTypes.Type.valueOf(dataType.getTypeName());
-
- TajoDataTypes.DataType.Builder builder = TajoDataTypes.DataType.newBuilder();
- builder.setType(type);
- if (dataType.hasLengthOrPrecision()) {
- builder.setLength(dataType.getLengthOrPrecision());
- }
- return builder.build();
- }
-
-
- @Override
- public LogicalNode visitDropTable(PlanContext context, Stack<Expr> stack, DropTable dropTable) {
- DropTableNode dropTableNode = context.queryBlock.getNodeFromExpr(dropTable);
- String qualified;
- if (CatalogUtil.isFQTableName(dropTable.getTableName())) {
- qualified = dropTable.getTableName();
- } else {
- qualified = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), dropTable.getTableName());
- }
- dropTableNode.init(qualified, dropTable.isIfExists(), dropTable.isPurge());
- return dropTableNode;
- }
-
- public LogicalNode visitAlterTablespace(PlanContext context, Stack<Expr> stack, AlterTablespace alterTablespace) {
- AlterTablespaceNode alter = context.queryBlock.getNodeFromExpr(alterTablespace);
- alter.setTablespaceName(alterTablespace.getTablespaceName());
- alter.setLocation(alterTablespace.getLocation());
- return alter;
- }
-
- @Override
- public LogicalNode visitAlterTable(PlanContext context, Stack<Expr> stack, AlterTable alterTable) {
- AlterTableNode alterTableNode = context.queryBlock.getNodeFromExpr(alterTable);
- alterTableNode.setTableName(alterTable.getTableName());
- alterTableNode.setNewTableName(alterTable.getNewTableName());
- alterTableNode.setColumnName(alterTable.getColumnName());
- alterTableNode.setNewColumnName(alterTable.getNewColumnName());
-
- if (null != alterTable.getAddNewColumn()) {
- alterTableNode.setAddNewColumn(convertColumn(alterTable.getAddNewColumn()));
- }
- alterTableNode.setAlterTableOpType(alterTable.getAlterTableOpType());
- return alterTableNode;
- }
-
- @Override
- public LogicalNode visitTruncateTable(PlanContext context, Stack<Expr> stack, TruncateTable truncateTable)
- throws PlanningException {
- TruncateTableNode truncateTableNode = context.queryBlock.getNodeFromExpr(truncateTable);
- truncateTableNode.setTableNames(truncateTable.getTableNames());
- return truncateTableNode;
- }
-
- /*===============================================================================================
- Util SECTION
- ===============================================================================================*/
-
- public static boolean checkIfBeEvaluatedAtWindowAgg(EvalNode evalNode, WindowAggNode node) {
- Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
-
- if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
- return false;
- }
-
- if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
- return false;
- }
-
- return true;
- }
-
- public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode node) {
- Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
-
- if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
- return false;
- }
-
- if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
- return false;
- }
-
- return true;
- }
-
- public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode node,
- boolean isTopMostJoin) {
- Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
-
- if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
- return false;
- }
-
- if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
- return false;
- }
-
- if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
- return false;
- }
-
- // When a 'case-when' is used with outer join, the case-when expression must be evaluated
- // at the topmost join operator.
- // TODO - It's also valid that case-when is evalauted at the topmost outer operator.
- // But, how can we know there is no further outer join operator after this node?
- if (containsOuterJoin(block)) {
- if (!isTopMostJoin) {
- Collection<EvalNode> found = EvalTreeUtil.findOuterJoinSensitiveEvals(evalNode);
- if (found.size() > 0) {
- return false;
- }
- }
- }
-
- return true;
- }
-
- public static boolean isOuterJoin(JoinType joinType) {
- return joinType == JoinType.LEFT_OUTER || joinType == JoinType.RIGHT_OUTER || joinType==JoinType.FULL_OUTER;
- }
-
- public static boolean containsOuterJoin(QueryBlock block) {
- return block.containsJoinType(JoinType.LEFT_OUTER) || block.containsJoinType(JoinType.RIGHT_OUTER) ||
- block.containsJoinType(JoinType.FULL_OUTER);
- }
-
- /**
- * It checks if evalNode can be evaluated at this @{link RelationNode}.
- */
- public static boolean checkIfBeEvaluatedAtRelation(QueryBlock block, EvalNode evalNode, RelationNode node) {
- Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
-
- // aggregation functions cannot be evaluated in scan node
- if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
- return false;
- }
-
- // aggregation functions cannot be evaluated in scan node
- if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
- return false;
- }
-
- if (columnRefs.size() > 0 && !node.getTableSchema().containsAll(columnRefs)) {
- return false;
- }
-
- // Why? - When a {case when} is used with outer join, case when must be evaluated at topmost outer join.
- if (containsOuterJoin(block)) {
- Collection<EvalNode> found = EvalTreeUtil.findOuterJoinSensitiveEvals(evalNode);
- if (found.size() > 0) {
- return false;
- }
- }
-
- return true;
- }
-
- public static boolean checkIfBeEvaluatedAtThis(EvalNode evalNode, LogicalNode node) {
- Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
- if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
- return false;
- }
-
- return true;
- }
-}