You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/26 06:46:13 UTC
[14/33] TAJO-1125: Separate logical plan and optimizer into a maven
module.
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
deleted file mode 100644
index 666c5fc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.Stack;
-
-public class PartitionedTableRewriter implements RewriteRule {
- private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
-
- private static final String NAME = "Partitioned Table Rewriter";
- private final Rewriter rewriter = new Rewriter();
-
- private final TajoConf systemConf;
-
- public PartitionedTableRewriter(TajoConf conf) {
- systemConf = conf;
- }
-
- @Override
- public String getName() {
- return NAME;
- }
-
- @Override
- public boolean isEligible(LogicalPlan plan) {
- for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
- for (RelationNode relation : block.getRelations()) {
- if (relation.getType() == NodeType.SCAN) {
- TableDesc table = ((ScanNode)relation).getTableDesc();
- if (table.hasPartition()) {
- return true;
- }
- }
- }
- }
- return false;
- }
-
- @Override
- public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
- LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
- rewriter.visit(rootBlock, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
- return plan;
- }
-
- private static class PartitionPathFilter implements PathFilter {
- private Schema schema;
- private EvalNode partitionFilter;
-
-
- public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
- this.schema = schema;
- this.partitionFilter = partitionFilter;
- }
-
- @Override
- public boolean accept(Path path) {
- Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
- if (tuple == null) { // if it is a file or not acceptable file
- return false;
- }
-
- return partitionFilter.eval(schema, tuple).asBool();
- }
-
- @Override
- public String toString() {
- return partitionFilter.toString();
- }
- }
-
- /**
- * It assumes that each conjunctive form corresponds to one column.
- *
- * @param partitionColumns
- * @param conjunctiveForms search condition corresponding to partition columns.
- * If it is NULL, it means that there is no search condition for this table.
- * @param tablePath
- * @return
- * @throws IOException
- */
- private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
- throws IOException {
-
- FileSystem fs = tablePath.getFileSystem(systemConf);
-
- PathFilter [] filters;
- if (conjunctiveForms == null) {
- filters = buildAllAcceptingPathFilters(partitionColumns);
- } else {
- filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
- }
-
- // loop from one to the number of partition columns
- Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
-
- for (int i = 1; i < partitionColumns.size(); i++) {
- // Get all file status matched to a ith level path filter.
- filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
- }
-
- LOG.info("Filtered directory or files: " + filteredPaths.length);
- return filteredPaths;
- }
-
- /**
- * Build path filters for all levels with a list of filter conditions.
- *
- * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
- * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
- *
- * Corresponding filter conditions will be placed on each path filter,
- * If there is no corresponding expression for certain column,
- * The condition will be filled with a true value.
- *
- * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
- * There is no filter condition corresponding to col2.
- * Then, the path filter conditions are corresponding to the followings:
- *
- * The first path filter: col1 = 'A'
- * The second path filter: col1 = 'A' AND col2 IS NOT NULL
- * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
- *
- * 'IS NOT NULL' predicate is always true against the partition path.
- *
- * @param partitionColumns
- * @param conjunctiveForms
- * @return
- */
- private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
- EvalNode [] conjunctiveForms) {
- // Building partition path filters for all levels
- Column target;
- PathFilter [] filters = new PathFilter[partitionColumns.size()];
- List<EvalNode> accumulatedFilters = Lists.newArrayList();
- for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
- target = partitionColumns.getColumn(i);
-
- for (EvalNode expr : conjunctiveForms) {
- if (EvalTreeUtil.findUniqueColumns(expr).contains(target)) {
- // Accumulate one qual per level
- accumulatedFilters.add(expr);
- }
- }
-
- if (accumulatedFilters.size() < (i + 1)) {
- accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
- }
-
- EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
- accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
- filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
- }
- return filters;
- }
-
- /**
- * Build an array of path filters for all levels with all accepting filter condition.
- * @param partitionColumns The partition columns schema
- * @return The array of path filter, accpeting all partition paths.
- */
- private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
- Column target;
- PathFilter [] filters = new PathFilter[partitionColumns.size()];
- List<EvalNode> accumulatedFilters = Lists.newArrayList();
- for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
- target = partitionColumns.getColumn(i);
- accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
-
- EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
- accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
- filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
- }
- return filters;
- }
-
- private static Path [] toPathArray(FileStatus[] fileStatuses) {
- Path [] paths = new Path[fileStatuses.length];
- for (int j = 0; j < fileStatuses.length; j++) {
- paths[j] = fileStatuses[j].getPath();
- }
- return paths;
- }
-
- private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
- TableDesc table = scanNode.getTableDesc();
- PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
-
- Schema paritionValuesSchema = new Schema();
- for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
- paritionValuesSchema.addColumn(column);
- }
-
- Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
-
- // if a query statement has a search condition, try to find indexable predicates
- if (scanNode.hasQual()) {
- EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
- Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
-
- // add qualifier to schema for qual
- paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
- for (Column column : paritionValuesSchema.getColumns()) {
- for (EvalNode simpleExpr : conjunctiveForms) {
- if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
- indexablePredicateSet.add(simpleExpr);
- }
- }
- }
-
- // Partitions which are not matched to the partition filter conditions are pruned immediately.
- // So, the partition filter conditions are not necessary later, and they are removed from
- // original search condition for simplicity and efficiency.
- remainExprs.removeAll(indexablePredicateSet);
- if (remainExprs.isEmpty()) {
- scanNode.setQual(null);
- } else {
- scanNode.setQual(
- AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
- }
- }
-
- if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
- return findFilteredPaths(paritionValuesSchema,
- indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
- } else { // otherwise, we will get all partition paths.
- return findFilteredPaths(paritionValuesSchema, null, table.getPath());
- }
- }
-
- private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
- if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
- Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
- // if it contains only single variable matched to a target column
- return variables.size() == 1 && variables.contains(targetColumn);
- } else {
- return false;
- }
- }
-
- /**
- * Check if an expression consists of one variable and one constant and
- * the expression is a comparison operator.
- *
- * @param evalNode The expression to be checked
- * @return true if an expression consists of one variable and one constant
- * and the expression is a comparison operator. Other, false.
- */
- private boolean checkIfIndexablePredicate(EvalNode evalNode) {
- // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
- return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
- }
-
- /**
- *
- * @param evalNode The expression to be checked
- * @return true if an disjunctive expression, consisting of indexable expressions
- */
- private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
- if (evalNode.getType() == EvalType.OR) {
- BinaryEval orEval = (BinaryEval) evalNode;
- boolean indexable =
- checkIfIndexablePredicate(orEval.getLeftExpr()) &&
- checkIfIndexablePredicate(orEval.getRightExpr());
-
- boolean sameVariable =
- EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr())
- .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr()));
-
- return indexable && sameVariable;
- } else {
- return false;
- }
- }
-
- private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
- if (scanNode.getInputPaths().length > 0) {
- try {
- FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
- long totalVolume = 0;
-
- for (Path input : scanNode.getInputPaths()) {
- ContentSummary summary = fs.getContentSummary(input);
- totalVolume += summary.getLength();
- totalVolume += summary.getFileCount();
- }
- scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
- } catch (IOException e) {
- throw new PlanningException(e);
- }
- }
- }
-
- private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> {
- @Override
- public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode,
- Stack<LogicalNode> stack) throws PlanningException {
-
- TableDesc table = scanNode.getTableDesc();
- if (!table.hasPartition()) {
- return null;
- }
-
- try {
- Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
- plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
- PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class);
- rewrittenScanNode.init(scanNode, filteredPaths);
- updateTableStat(rewrittenScanNode);
-
- // if it is topmost node, set it as the rootnode of this block.
- if (stack.empty() || block.getRoot().equals(scanNode)) {
- block.setRoot(rewrittenScanNode);
- } else {
- PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
- }
- } catch (IOException e) {
- throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
- }
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
deleted file mode 100644
index ec5df04..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ /dev/null
@@ -1,1145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import com.google.common.collect.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-/**
- * ProjectionPushDownRule deploys expressions in a selection list to proper
- * {@link org.apache.tajo.engine.planner.logical.Projectable}
- * nodes. In this process, the expressions are usually pushed down into as lower as possible.
- * It also enables scanners to read only necessary columns.
- */
-public class ProjectionPushDownRule extends
- BasicLogicalPlanVisitor<ProjectionPushDownRule.Context, LogicalNode> implements RewriteRule {
- /** Class Logger */
- private final Log LOG = LogFactory.getLog(ProjectionPushDownRule.class);
- private static final String name = "ProjectionPushDown";
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public boolean isEligible(LogicalPlan plan) {
- LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
-
- if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) {
- return false;
- }
- for (QueryBlock eachBlock: plan.getQueryBlocks()) {
- if (eachBlock.hasTableExpression()) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
- LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
-
- LogicalPlan.QueryBlock topmostBlock = rootBlock;
-
- Stack<LogicalNode> stack = new Stack<LogicalNode>();
- Context context = new Context(plan);
- visit(context, plan, topmostBlock, topmostBlock.getRoot(), stack);
-
- return plan;
- }
-
- /**
- * <h2>What is TargetListManager?</h2>
- * It manages all expressions used in a query block, and their reference names.
- * TargetListManager provides a way to find an expression by a reference name.
- * It keeps a set of expressions, and one or more reference names can point to
- * the same expression.
- *
- * Also, TargetListManager keeps the evaluation state of each expression.
- * The evaluation state is a boolean state to indicate whether the expression
- * was evaluated in descendant node or not. If an expression is evaluated,
- * the evaluation state is changed to TRUE. It also means that
- * the expression can be referred by an column reference instead of evaluating the expression.
- *
- * Consider an example query:
- *
- * SELECT sum(l_orderkey + 1) from lineitem where l_partkey > 1;
- *
- * In this case, an expression sum(l_orderkey + 1) is divided into two sub expressions:
- * <ul>
- * <li>$1 <- l_orderkey + 1</li>
- * <li>$2 <- sum($1)</li>
- * </ul>
- *
- * <code>$1</code> is a simple arithmetic operation, and $2 is an aggregation function.
- * <code>$1</code> is evaluated in ScanNode because it's just a simple arithmetic operation.
- * So, the evaluation state of l_orderkey + 1 initially
- * is false, but the state will be true after ScanNode.
- *
- * In contrast, sum($1) is evaluated at GroupbyNode. So, its evaluation state is changed
- * after GroupByNode.
- *
- * <h2>Why is TargetListManager necessary?</h2>
- *
- * Expressions used in a query block can be divided into various categories according to
- * the possible {@link Projectable} nodes. Their references become available depending on
- * the Projectable node at which expressions are evaluated. It manages the expressions and
- * references for optimized places of expressions. It performs duplicated removal and enables
- * common expressions to be shared with two or more Projectable nodes. It also helps Projectable
- * nodes to find correct column references.
- */
- public static class TargetListManager {
- private Integer seqId = 0;
-
- /**
- * Why should we use LinkedHashMap for those maps ?
- *
- * These maps are mainly by the target list of each projectable node
- * (i.e., ProjectionNode, GroupbyNode, JoinNode, and ScanNode).
- * The projection node removal occurs only when the projection node's output
- * schema and its child's output schema are equivalent to each other.
- *
- * If we keep the inserted order of all expressions. It would make the possibility
- * of projection node removal higher.
- **/
-
- /** A Map: Name -> Id */
- private LinkedHashMap<String, Integer> nameToIdBiMap;
- /** Map: Id <-> EvalNode */
- private BiMap<Integer, EvalNode> idToEvalBiMap;
- /** Map: Id -> Names */
- private LinkedHashMap<Integer, List<String>> idToNamesMap;
- /** Map: Id -> Boolean */
- private LinkedHashMap<Integer, Boolean> evaluationStateMap;
- /** Map: alias name -> Id */
- private LinkedHashMap<String, Integer> aliasMap;
-
- private LogicalPlan plan;
-
- public TargetListManager(LogicalPlan plan) {
- this.plan = plan;
- nameToIdBiMap = Maps.newLinkedHashMap();
- idToEvalBiMap = HashBiMap.create();
- idToNamesMap = Maps.newLinkedHashMap();
- evaluationStateMap = Maps.newLinkedHashMap();
- aliasMap = Maps.newLinkedHashMap();
- }
-
- private int getNextSeqId() {
- return seqId++;
- }
-
- /**
- * If some expression is duplicated, we call an alias indicating the duplicated expression 'native alias'.
- * This method checks whether a reference is native alias or not.
- *
- * @param name The reference name
- * @return True if the reference is native alias. Otherwise, it will return False.
- */
- public boolean isNativeAlias(String name) {
- return aliasMap.containsKey(name);
- }
-
- /**
- * This method retrieves the name indicating actual expression that an given alias indicate.
- *
- * @param name an alias name
- * @return Real reference name
- */
- public String getRealReferenceName(String name) {
- int refId = aliasMap.get(name);
- return getPrimaryName(refId);
- }
-
- /**
- * Add an expression with a specified name, which is usually an alias.
- * Later, you can refer this expression by the specified name.
- */
- private String add(String specifiedName, EvalNode evalNode) throws PlanningException {
-
- // if a name already exists, it only just keeps an actual
- // expression instead of a column reference.
- if (nameToIdBiMap.containsKey(specifiedName)) {
-
- int refId = nameToIdBiMap.get(specifiedName);
- EvalNode found = idToEvalBiMap.get(refId);
- if (found != null) {
- if (evalNode.equals(found)) { // if input expression already exists
- return specifiedName;
- } else {
- // The case where if existing reference name and a given reference name are the same to each other and
- // existing EvalNode and a given EvalNode is the different
- if (found.getType() != EvalType.FIELD && evalNode.getType() != EvalType.FIELD) {
- throw new PlanningException("Duplicate alias: " + evalNode);
- }
-
- if (found.getType() == EvalType.FIELD) {
- Integer daggling = idToEvalBiMap.inverse().get(evalNode);
- idToEvalBiMap.forcePut(refId, evalNode);
- if (daggling != null) {
- String name = getPrimaryName(daggling);
- idToNamesMap.remove(daggling);
- nameToIdBiMap.put(name, refId);
- if (!idToNamesMap.get(refId).contains(name)) {
- TUtil.putToNestedList(idToNamesMap, refId, name);
- }
- }
- }
- }
- }
- }
-
- int refId;
- if (idToEvalBiMap.inverse().containsKey(evalNode)) {
- refId = idToEvalBiMap.inverse().get(evalNode);
- aliasMap.put(specifiedName, refId);
-
- } else {
- refId = getNextSeqId();
- idToEvalBiMap.put(refId, evalNode);
- TUtil.putToNestedList(idToNamesMap, refId, specifiedName);
- for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
- add(new FieldEval(column));
- }
- evaluationStateMap.put(refId, false);
- }
-
- nameToIdBiMap.put(specifiedName, refId);
-
- return specifiedName;
- }
-
- /**
- * Adds an expression without any name. It returns an automatically
- * generated name. It can be also used for referring this expression.
- */
- public String add(EvalNode evalNode) throws PlanningException {
- String name;
-
- if (evalNode.getType() == EvalType.FIELD) {
- FieldEval fieldEval = (FieldEval) evalNode;
- if (nameToIdBiMap.containsKey(fieldEval.getName())) {
- int refId = nameToIdBiMap.get(fieldEval.getName());
- return getPrimaryName(refId);
- }
- }
-
- if (idToEvalBiMap.inverse().containsKey(evalNode)) {
- int refId = idToEvalBiMap.inverse().get(evalNode);
- return getPrimaryName(refId);
- }
-
- if (evalNode.getType() == EvalType.FIELD) {
- FieldEval fieldEval = (FieldEval) evalNode;
- name = fieldEval.getName();
- } else {
- name = plan.generateUniqueColumnName(evalNode);
- }
-
- return add(name, evalNode);
- }
-
- public Collection<String> getNames() {
- return nameToIdBiMap.keySet();
- }
-
- public String add(Target target) throws PlanningException {
- return add(target.getCanonicalName(), target.getEvalTree());
- }
-
- /**
- * Each expression can have one or more names.
- * We call a name added with an expression firstly as the primary name.
- * It has a special meaning. Since duplicated expression in logical planning are removed,
- * the primary name is only used for each expression during logical planning.
- *
- * @param refId The identifier of an expression
- * @param name The name to check if it is the primary name.
- * @return True if this name is the primary added name. Otherwise, False.
- */
- private boolean isPrimaryName(int refId, String name) {
- if (idToNamesMap.get(refId).size() > 0) {
- return getPrimaryName(refId).equals(name);
- } else {
- return false;
- }
- }
-
- private String getPrimaryName(int refId) {
- return idToNamesMap.get(refId).get(0);
- }
-
- public Target getTarget(String name) {
- if (!nameToIdBiMap.containsKey(name)) {
- throw new RuntimeException("No Such target name: " + name);
- }
- int id = nameToIdBiMap.get(name);
- EvalNode evalNode = idToEvalBiMap.get(id);
-
- // if it is a constant value, just returns a constant because it can be evaluated everywhere.
- if (evalNode.getType() == EvalType.CONST) {
- return new Target(evalNode, name);
- }
-
- // if a name is not the primary name, it means that its expression may be already evaluated and
- // can just refer a value. Consider an example as follows:
- //
- // select l_orderkey + 1 as total1, l_orderkey + 1 as total2 from lineitem
- //
- // In this case, total2 will meet the following condition. Then, total2 can
- // just refer the result of total1 rather than calculating l_orderkey + 1.
- if (!isPrimaryName(id, name) && isEvaluated(getPrimaryName(id))) {
- evalNode = new FieldEval(getPrimaryName(id), evalNode.getValueType());
- }
-
- // if it is a column reference itself, just returns a column reference without any alias.
- if (evalNode.getType() == EvalType.FIELD && evalNode.getName().equals(name)) {
- return new Target((FieldEval)evalNode);
- } else { // otherwise, it returns an expression.
- return new Target(evalNode, name);
- }
- }
-
- public boolean isEvaluated(String name) {
- if (!nameToIdBiMap.containsKey(name)) {
- throw new RuntimeException("No Such target name: " + name);
- }
- int refId = nameToIdBiMap.get(name);
- return evaluationStateMap.get(refId);
- }
-
- public void markAsEvaluated(Target target) {
- int refId = nameToIdBiMap.get(target.getCanonicalName());
- EvalNode evalNode = target.getEvalTree();
- if (!idToNamesMap.containsKey(refId)) {
- throw new RuntimeException("No such eval: " + evalNode);
- }
- evaluationStateMap.put(refId, true);
- }
-
- public Iterator<Target> getFilteredTargets(Set<String> required) {
- return new FilteredTargetIterator(required);
- }
-
- class FilteredTargetIterator implements Iterator<Target> {
- List<Target> filtered = TUtil.newList();
- Iterator<Target> iterator;
-
- public FilteredTargetIterator(Set<String> required) {
- for (String name : nameToIdBiMap.keySet()) {
- if (required.contains(name)) {
- filtered.add(getTarget(name));
- }
- }
- iterator = filtered.iterator();
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public Target next() {
- return iterator.next();
- }
-
- @Override
- public void remove() {
- }
- }
-
- public String toString() {
- int evaluated = 0;
- for (Boolean flag: evaluationStateMap.values()) {
- if (flag) {
- evaluated++;
- }
- }
- return "eval=" + evaluationStateMap.size() + ", evaluated=" + evaluated;
- }
- }
-
- static class Context {
- TargetListManager targetListMgr;
- Set<String> requiredSet;
-
- public Context(LogicalPlan plan) {
- requiredSet = new LinkedHashSet<String>();
- targetListMgr = new TargetListManager(plan);
- }
-
- public Context(LogicalPlan plan, Collection<String> requiredSet) {
- this.requiredSet = new LinkedHashSet<String>(requiredSet);
- targetListMgr = new TargetListManager(plan);
- }
-
- public Context(Context upperContext) {
- this.requiredSet = new LinkedHashSet<String>(upperContext.requiredSet);
- targetListMgr = upperContext.targetListMgr;
- }
-
- public String addExpr(Target target) throws PlanningException {
- String reference = targetListMgr.add(target);
- addNecessaryReferences(target.getEvalTree());
- return reference;
- }
-
- public String addExpr(EvalNode evalNode) throws PlanningException {
- String reference = targetListMgr.add(evalNode);
- addNecessaryReferences(evalNode);
- return reference;
- }
-
- private void addNecessaryReferences(EvalNode evalNode) {
- for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
- requiredSet.add(column.getQualifiedName());
- }
- }
-
- @Override
- public String toString() {
- return "required=" + requiredSet.size() + "," + targetListMgr.toString();
- }
- }
-
- @Override
- public LogicalNode visitRoot(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalRootNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- LogicalNode child = super.visitRoot(context, plan, block, node, stack);
- node.setInSchema(child.getOutSchema());
- node.setOutSchema(child.getOutSchema());
- return node;
- }
-
- @Override
- public LogicalNode visitProjection(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
- ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException {
- Context newContext = new Context(context);
- Target [] targets = node.getTargets();
- int targetNum = targets.length;
- String [] referenceNames = new String[targetNum];
- for (int i = 0; i < targetNum; i++) {
- referenceNames[i] = newContext.addExpr(targets[i]);
- }
-
- LogicalNode child = super.visitProjection(newContext, plan, block, node, stack);
-
- node.setInSchema(child.getOutSchema());
-
- int evaluationCount = 0;
- List<Target> finalTargets = TUtil.newList();
- for (String referenceName : referenceNames) {
- Target target = context.targetListMgr.getTarget(referenceName);
-
- if (target.getEvalTree().getType() == EvalType.CONST) {
- finalTargets.add(target);
- } else if (context.targetListMgr.isEvaluated(referenceName)) {
- if (context.targetListMgr.isNativeAlias(referenceName)) {
- String realRefName = context.targetListMgr.getRealReferenceName(referenceName);
- finalTargets.add(new Target(new FieldEval(realRefName, target.getDataType()), referenceName));
- } else {
- finalTargets.add(new Target(new FieldEval(target.getNamedColumn())));
- }
- } else if (LogicalPlanner.checkIfBeEvaluatedAtThis(target.getEvalTree(), node)) {
- finalTargets.add(target);
- context.targetListMgr.markAsEvaluated(target);
- evaluationCount++;
- }
- }
-
- node.setTargets(finalTargets.toArray(new Target[finalTargets.size()]));
- LogicalPlanner.verifyProjectedFields(block, node);
-
- // Removing ProjectionNode
- // TODO - Consider INSERT and CTAS statement, and then remove the check of stack.empty.
- if (evaluationCount == 0 && PlannerUtil.targetToSchema(finalTargets).equals(child.getOutSchema())) {
- if (stack.empty()) {
- // if it is topmost, set it as the root of this block.
- block.setRoot(child);
- } else {
- LogicalNode parentNode = stack.peek();
- switch (parentNode.getType()) {
- case ROOT:
- LogicalRootNode rootNode = (LogicalRootNode) parentNode;
- rootNode.setChild(child);
- rootNode.setInSchema(child.getOutSchema());
- rootNode.setOutSchema(child.getOutSchema());
- break;
- case TABLE_SUBQUERY:
- TableSubQueryNode tableSubQueryNode = (TableSubQueryNode) parentNode;
- tableSubQueryNode.setSubQuery(child);
- break;
- case STORE:
- StoreTableNode storeTableNode = (StoreTableNode) parentNode;
- storeTableNode.setChild(child);
- storeTableNode.setInSchema(child.getOutSchema());
- break;
- case INSERT:
- InsertNode insertNode = (InsertNode) parentNode;
- insertNode.setSubQuery(child);
- break;
- case CREATE_TABLE:
- CreateTableNode createTableNode = (CreateTableNode) parentNode;
- createTableNode.setChild(child);
- createTableNode.setInSchema(child.getOutSchema());
- break;
- default:
- throw new PlanningException("Unexpected Parent Node: " + parentNode.getType());
- }
- plan.addHistory("ProjectionNode is eliminated.");
- }
-
- return child;
-
- } else {
- return node;
- }
- }
-
- public LogicalNode visitLimit(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LimitNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- LogicalNode child = super.visitLimit(context, plan, block, node, stack);
-
- node.setInSchema(child.getOutSchema());
- node.setOutSchema(child.getOutSchema());
- return node;
- }
-
- @Override
- public LogicalNode visitSort(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
- SortNode node, Stack<LogicalNode> stack) throws PlanningException {
- Context newContext = new Context(context);
-
- final int sortKeyNum = node.getSortKeys().length;
- String [] keyNames = new String[sortKeyNum];
- for (int i = 0; i < sortKeyNum; i++) {
- SortSpec sortSpec = node.getSortKeys()[i];
- keyNames[i] = newContext.addExpr(new FieldEval(sortSpec.getSortKey()));
- }
-
- LogicalNode child = super.visitSort(newContext, plan, block, node, stack);
-
- // it rewrite sortkeys. This rewrite sets right column names and eliminates duplicated sort keys.
- List<SortSpec> sortSpecs = new ArrayList<SortSpec>();
- for (int i = 0; i < keyNames.length; i++) {
- String sortKey = keyNames[i];
- Target target = context.targetListMgr.getTarget(sortKey);
- if (context.targetListMgr.isEvaluated(sortKey)) {
- Column c = target.getNamedColumn();
- SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst());
- if (!sortSpecs.contains(sortSpec)) {
- sortSpecs.add(sortSpec);
- }
- } else {
- if (target.getEvalTree().getType() == EvalType.FIELD) {
- Column c = ((FieldEval)target.getEvalTree()).getColumnRef();
- SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst());
- if (!sortSpecs.contains(sortSpec)) {
- sortSpecs.add(sortSpec);
- }
- }
- }
- }
- node.setSortSpecs(sortSpecs.toArray(new SortSpec[sortSpecs.size()]));
-
- node.setInSchema(child.getOutSchema());
- node.setOutSchema(child.getOutSchema());
- return node;
- }
-
- @Override
- public LogicalNode visitHaving(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, HavingNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- Context newContext = new Context(context);
- String referenceName = newContext.targetListMgr.add(node.getQual());
- newContext.addNecessaryReferences(node.getQual());
-
- LogicalNode child = super.visitHaving(newContext, plan, block, node, stack);
-
- node.setInSchema(child.getOutSchema());
- node.setOutSchema(child.getOutSchema());
-
- Target target = context.targetListMgr.getTarget(referenceName);
- if (newContext.targetListMgr.isEvaluated(referenceName)) {
- node.setQual(new FieldEval(target.getNamedColumn()));
- } else {
- node.setQual(target.getEvalTree());
- newContext.targetListMgr.markAsEvaluated(target);
- }
-
- return node;
- }
-
- public LogicalNode visitWindowAgg(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- Context newContext = new Context(context);
-
- if (node.hasPartitionKeys()) {
- for (Column c : node.getPartitionKeys()) {
- newContext.addNecessaryReferences(new FieldEval(c));
- }
- }
-
- if (node.hasSortSpecs()) {
- for (SortSpec sortSpec : node.getSortSpecs()) {
- newContext.addNecessaryReferences(new FieldEval(sortSpec.getSortKey()));
- }
- }
-
- for (WindowFunctionEval winFunc : node.getWindowFunctions()) {
- if (winFunc.hasSortSpecs()) {
- for (SortSpec sortSpec : winFunc.getSortSpecs()) {
- newContext.addNecessaryReferences(new FieldEval(sortSpec.getSortKey()));
- }
- }
- }
-
-
- int nonFunctionColumnNum = node.getTargets().length - node.getWindowFunctions().length;
- LinkedHashSet<String> nonFunctionColumns = Sets.newLinkedHashSet();
- for (int i = 0; i < nonFunctionColumnNum; i++) {
- FieldEval fieldEval = (new FieldEval(node.getTargets()[i].getNamedColumn()));
- nonFunctionColumns.add(newContext.addExpr(fieldEval));
- }
-
- final String [] aggEvalNames;
- if (node.hasAggFunctions()) {
- final int evalNum = node.getWindowFunctions().length;
- aggEvalNames = new String[evalNum];
- for (int evalIdx = 0, targetIdx = nonFunctionColumnNum; targetIdx < node.getTargets().length; evalIdx++,
- targetIdx++) {
- Target target = node.getTargets()[targetIdx];
- WindowFunctionEval winFunc = node.getWindowFunctions()[evalIdx];
- aggEvalNames[evalIdx] = newContext.addExpr(new Target(winFunc, target.getCanonicalName()));
- }
- } else {
- aggEvalNames = null;
- }
-
- // visit a child node
- LogicalNode child = super.visitWindowAgg(newContext, plan, block, node, stack);
-
- node.setInSchema(child.getOutSchema());
-
- List<Target> targets = Lists.newArrayList();
- if (nonFunctionColumnNum > 0) {
- for (String column : nonFunctionColumns) {
- Target target = context.targetListMgr.getTarget(column);
-
- // it rewrite grouping keys.
- // This rewrite sets right column names and eliminates duplicated grouping keys.
- if (context.targetListMgr.isEvaluated(column)) {
- targets.add(new Target(new FieldEval(target.getNamedColumn())));
- } else {
- if (target.getEvalTree().getType() == EvalType.FIELD) {
- targets.add(target);
- }
- }
- }
- }
-
- // Getting projected targets
- if (node.hasAggFunctions() && aggEvalNames != null) {
- WindowFunctionEval [] aggEvals = new WindowFunctionEval[aggEvalNames.length];
- int i = 0;
- for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) {
-
- String referenceName = it.next();
- Target target = context.targetListMgr.getTarget(referenceName);
-
- if (LogicalPlanner.checkIfBeEvaluatedAtWindowAgg(target.getEvalTree(), node)) {
- aggEvals[i++] = target.getEvalTree();
- context.targetListMgr.markAsEvaluated(target);
-
- targets.add(new Target(new FieldEval(target.getNamedColumn())));
- }
- }
- if (aggEvals.length > 0) {
- node.setWindowFunctions(aggEvals);
- }
- }
-
- node.setTargets(targets.toArray(new Target[targets.size()]));
- return node;
- }
-
- public LogicalNode visitGroupBy(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- Context newContext = new Context(context);
-
- // Getting grouping key names
- final int groupingKeyNum = node.getGroupingColumns().length;
- LinkedHashSet<String> groupingKeyNames = null;
- if (groupingKeyNum > 0) {
- groupingKeyNames = Sets.newLinkedHashSet();
- for (int i = 0; i < groupingKeyNum; i++) {
- FieldEval fieldEval = new FieldEval(node.getGroupingColumns()[i]);
- groupingKeyNames.add(newContext.addExpr(fieldEval));
- }
- }
-
- // Getting eval names
-
- final String [] aggEvalNames;
- if (node.hasAggFunctions()) {
- final int evalNum = node.getAggFunctions().length;
- aggEvalNames = new String[evalNum];
- for (int evalIdx = 0, targetIdx = groupingKeyNum; targetIdx < node.getTargets().length; evalIdx++, targetIdx++) {
- Target target = node.getTargets()[targetIdx];
- EvalNode evalNode = node.getAggFunctions()[evalIdx];
- aggEvalNames[evalIdx] = newContext.addExpr(new Target(evalNode, target.getCanonicalName()));
- }
- } else {
- aggEvalNames = null;
- }
-
- // visit a child node
- LogicalNode child = super.visitGroupBy(newContext, plan, block, node, stack);
-
- node.setInSchema(child.getOutSchema());
-
- List<Target> targets = Lists.newArrayList();
- if (groupingKeyNum > 0 && groupingKeyNames != null) {
- // Restoring grouping key columns
- final List<Column> groupingColumns = new ArrayList<Column>();
- for (String groupingKey : groupingKeyNames) {
- Target target = context.targetListMgr.getTarget(groupingKey);
-
- // it rewrite grouping keys.
- // This rewrite sets right column names and eliminates duplicated grouping keys.
- if (context.targetListMgr.isEvaluated(groupingKey)) {
- Column c = target.getNamedColumn();
- if (!groupingColumns.contains(c)) {
- groupingColumns.add(c);
- targets.add(new Target(new FieldEval(target.getNamedColumn())));
- }
- } else {
- if (target.getEvalTree().getType() == EvalType.FIELD) {
- Column c = ((FieldEval)target.getEvalTree()).getColumnRef();
- if (!groupingColumns.contains(c)) {
- groupingColumns.add(c);
- targets.add(target);
- context.targetListMgr.markAsEvaluated(target);
- }
- } else {
- throw new PlanningException("Cannot evaluate this expression in grouping keys: " + target.getEvalTree());
- }
- }
- }
-
- node.setGroupingColumns(groupingColumns.toArray(new Column[groupingColumns.size()]));
- }
-
- // Getting projected targets
- if (node.hasAggFunctions() && aggEvalNames != null) {
- AggregationFunctionCallEval [] aggEvals = new AggregationFunctionCallEval[aggEvalNames.length];
- int i = 0;
- for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) {
-
- String referenceName = it.next();
- Target target = context.targetListMgr.getTarget(referenceName);
-
- if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) {
- aggEvals[i++] = target.getEvalTree();
- context.targetListMgr.markAsEvaluated(target);
- }
- }
- if (aggEvals.length > 0) {
- node.setAggFunctions(aggEvals);
- }
- }
- Target [] finalTargets = buildGroupByTarget(node, targets, aggEvalNames);
- node.setTargets(finalTargets);
-
- LogicalPlanner.verifyProjectedFields(block, node);
-
- return node;
- }
-
- public static Target [] buildGroupByTarget(GroupbyNode groupbyNode, @Nullable List<Target> groupingKeyTargets,
- String [] aggEvalNames) {
- final int groupingKeyNum =
- groupingKeyTargets == null ? groupbyNode.getGroupingColumns().length : groupingKeyTargets.size();
- final int aggrFuncNum = aggEvalNames != null ? aggEvalNames.length : 0;
- EvalNode [] aggEvalNodes = groupbyNode.getAggFunctions();
- Target [] targets = new Target[groupingKeyNum + aggrFuncNum];
-
- if (groupingKeyTargets != null) {
- for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
- targets[groupingKeyIdx] = groupingKeyTargets.get(groupingKeyIdx);
- }
- } else {
- for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
- targets[groupingKeyIdx] = new Target(new FieldEval(groupbyNode.getGroupingColumns()[groupingKeyIdx]));
- }
- }
-
- if (aggEvalNames != null) {
- for (int aggrFuncIdx = 0, targetIdx = groupingKeyNum; aggrFuncIdx < aggrFuncNum; aggrFuncIdx++, targetIdx++) {
- targets[targetIdx] =
- new Target(new FieldEval(aggEvalNames[aggrFuncIdx], aggEvalNodes[aggrFuncIdx].getValueType()));
- }
- }
-
- return targets;
- }
-
- public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
- SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
- Context newContext = new Context(context);
- String referenceName = newContext.targetListMgr.add(node.getQual());
- newContext.addNecessaryReferences(node.getQual());
-
- LogicalNode child = super.visitFilter(newContext, plan, block, node, stack);
-
- node.setInSchema(child.getOutSchema());
- node.setOutSchema(child.getOutSchema());
-
- Target target = context.targetListMgr.getTarget(referenceName);
- if (newContext.targetListMgr.isEvaluated(referenceName)) {
- node.setQual(new FieldEval(target.getNamedColumn()));
- } else {
- node.setQual(target.getEvalTree());
- newContext.targetListMgr.markAsEvaluated(target);
- }
-
- return node;
- }
-
- private static void pushDownIfComplexTermInJoinCondition(Context ctx, EvalNode cnf, EvalNode term)
- throws PlanningException {
-
- // If one of both terms in a binary operator is a complex expression, the binary operator will require
- // multiple phases. In this case, join cannot evaluate a binary operator.
- // So, we should prevent dividing the binary operator into more subexpressions.
- if (term.getType() != EvalType.FIELD &&
- !(term instanceof BinaryEval) &&
- !(term.getType() == EvalType.ROW_CONSTANT)) {
- String refName = ctx.addExpr(term);
- EvalTreeUtil.replace(cnf, term, new FieldEval(refName, term.getValueType()));
- }
- }
-
- public LogicalNode visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- Context newContext = new Context(context);
-
- String joinQualReference = null;
- if (node.hasJoinQual()) {
- for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(node.getJoinQual())) {
- if (eachQual instanceof BinaryEval) {
- BinaryEval binaryQual = (BinaryEval) eachQual;
-
- for (int i = 0; i < 2; i++) {
- EvalNode term = binaryQual.getChild(i);
- pushDownIfComplexTermInJoinCondition(newContext, eachQual, term);
- }
- }
- }
-
- joinQualReference = newContext.addExpr(node.getJoinQual());
- newContext.addNecessaryReferences(node.getJoinQual());
- }
-
- String [] referenceNames = null;
- if (node.hasTargets()) {
- referenceNames = new String[node.getTargets().length];
- int i = 0;
- for (Iterator<Target> it = getFilteredTarget(node.getTargets(), context.requiredSet); it.hasNext();) {
- Target target = it.next();
- referenceNames[i++] = newContext.addExpr(target);
- }
- }
-
- stack.push(node);
- LogicalNode left = visit(newContext, plan, block, node.getLeftChild(), stack);
- LogicalNode right = visit(newContext, plan, block, node.getRightChild(), stack);
- stack.pop();
-
- Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
-
- node.setInSchema(merged);
-
- if (node.hasJoinQual()) {
- Target target = context.targetListMgr.getTarget(joinQualReference);
- if (newContext.targetListMgr.isEvaluated(joinQualReference)) {
- throw new PlanningException("Join condition must be evaluated in the proper Join Node: " + joinQualReference);
- } else {
- node.setJoinQual(target.getEvalTree());
- newContext.targetListMgr.markAsEvaluated(target);
- }
- }
-
- LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
- for (Iterator<String> it = getFilteredReferences(context.targetListMgr.getNames(),
- context.requiredSet); it.hasNext();) {
- String referenceName = it.next();
- Target target = context.targetListMgr.getTarget(referenceName);
-
- if (context.targetListMgr.isEvaluated(referenceName)) {
- Target fieldReference = new Target(new FieldEval(target.getNamedColumn()));
- if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, fieldReference.getEvalTree(), node,
- stack.peek().getType() != NodeType.JOIN)) {
- projectedTargets.add(fieldReference);
- }
- } else if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, target.getEvalTree(), node,
- stack.peek().getType() != NodeType.JOIN)) {
- projectedTargets.add(target);
- context.targetListMgr.markAsEvaluated(target);
- }
- }
-
- node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
- LogicalPlanner.verifyProjectedFields(block, node);
- return node;
- }
-
- static Iterator<String> getFilteredReferences(Collection<String> targetNames, Set<String> required) {
- return new FilteredStringsIterator(targetNames, required);
- }
-
- static Iterator<String> getFilteredReferences(String [] targetNames, Collection<String> required) {
- return new FilteredStringsIterator(targetNames, required);
- }
-
- static class FilteredStringsIterator implements Iterator<String> {
- Iterator<String> iterator;
-
- FilteredStringsIterator(Collection<String> targetNames, Collection<String> required) {
- List<String> filtered = TUtil.newList();
- for (String name : targetNames) {
- if (required.contains(name)) {
- filtered.add(name);
- }
- }
-
- iterator = filtered.iterator();
- }
-
- FilteredStringsIterator(String [] targetNames, Collection<String> required) {
- this(TUtil.newList(targetNames), required);
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public String next() {
- return iterator.next();
- }
-
- @Override
- public void remove() {
- }
- }
-
- static Iterator<Target> getFilteredTarget(Target[] targets, Set<String> required) {
- return new FilteredIterator(targets, required);
- }
-
- static class FilteredIterator implements Iterator<Target> {
- Iterator<Target> iterator;
-
- FilteredIterator(Target [] targets, Set<String> requiredReferences) {
- List<Target> filtered = TUtil.newList();
- Map<String, Target> targetSet = new HashMap<String, Target>();
- for (Target t : targets) {
- // Only should keep an raw target instead of field reference.
- if (targetSet.containsKey(t.getCanonicalName())) {
- Target targetInSet = targetSet.get(t.getCanonicalName());
- EvalNode evalNode = targetInSet.getEvalTree();
- if (evalNode.getType() == EvalType.FIELD && t.getEvalTree().getType() != EvalType.FIELD) {
- targetSet.put(t.getCanonicalName(), t);
- }
- } else {
- targetSet.put(t.getCanonicalName(), t);
- }
- }
-
- for (String name : requiredReferences) {
- if (targetSet.containsKey(name)) {
- filtered.add(targetSet.get(name));
- }
- }
-
- iterator = filtered.iterator();
- }
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public Target next() {
- return iterator.next();
- }
-
- @Override
- public void remove() {
- }
- }
-
- @Override
- public LogicalNode visitUnion(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node,
- Stack<LogicalNode> stack) throws PlanningException {
-
- LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild());
- LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild());
-
- Context leftContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet,
- leftBlock.getName()));
- Context rightContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet,
- rightBlock.getName()));
-
- stack.push(node);
- visit(leftContext, plan, leftBlock, leftBlock.getRoot(), new Stack<LogicalNode>());
- visit(rightContext, plan, rightBlock, rightBlock.getRoot(), new Stack<LogicalNode>());
- stack.pop();
- return node;
- }
-
- public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
- Stack<LogicalNode> stack) throws PlanningException {
-
- Context newContext = new Context(context);
-
- Target [] targets;
- if (node.hasTargets()) {
- targets = node.getTargets();
- } else {
- targets = PlannerUtil.schemaToTargets(node.getTableSchema());
- }
-
- LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
- for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) {
- Target target = it.next();
- newContext.addExpr(target);
- }
-
- for (Iterator<Target> it = context.targetListMgr.getFilteredTargets(newContext.requiredSet); it.hasNext();) {
- Target target = it.next();
-
- if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
- projectedTargets.add(target);
- newContext.targetListMgr.markAsEvaluated(target);
- }
- }
-
- node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
- LogicalPlanner.verifyProjectedFields(block, node);
- return node;
- }
-
- @Override
- public LogicalNode visitPartitionedTableScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
- PartitionedTableScanNode node, Stack<LogicalNode> stack)
- throws PlanningException {
-
- Context newContext = new Context(context);
-
- Target [] targets;
- if (node.hasTargets()) {
- targets = node.getTargets();
- } else {
- targets = PlannerUtil.schemaToTargets(node.getOutSchema());
- }
-
- LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
- for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) {
- Target target = it.next();
- newContext.addExpr(target);
- }
-
- for (Iterator<Target> it = context.targetListMgr.getFilteredTargets(newContext.requiredSet); it.hasNext();) {
- Target target = it.next();
-
- if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
- projectedTargets.add(target);
- newContext.targetListMgr.markAsEvaluated(target);
- }
- }
-
- node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
- LogicalPlanner.verifyProjectedFields(block, node);
- return node;
- }
-
- @Override
- public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
- TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
- Context childContext = new Context(plan, upperContext.requiredSet);
- stack.push(node);
- LogicalNode child = super.visitTableSubQuery(childContext, plan, block, node, stack);
- node.setSubQuery(child);
- stack.pop();
-
- Target [] targets;
- if (node.hasTargets()) {
- targets = node.getTargets();
- } else {
- targets = PlannerUtil.schemaToTargets(node.getOutSchema());
- }
-
- LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
- for (Iterator<Target> it = getFilteredTarget(targets, upperContext.requiredSet); it.hasNext();) {
- Target target = it.next();
- upperContext.addExpr(target);
- }
-
- for (Iterator<Target> it = upperContext.targetListMgr.getFilteredTargets(upperContext.requiredSet); it.hasNext();) {
- Target target = it.next();
-
- if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
- projectedTargets.add(target);
- upperContext.targetListMgr.markAsEvaluated(target);
- }
- }
-
- node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
- LogicalPlanner.verifyProjectedFields(block, node);
- return node;
- }
-
- @Override
- public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
- Stack<LogicalNode> stack) throws PlanningException {
- stack.push(node);
- visit(context, plan, block, node.getChild(), stack);
- stack.pop();
- return node;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
deleted file mode 100644
index cb66582..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlanningException;
-
-public interface QueryRewriteEngine {
- /**
- * Rewrite a logical plan with all query rewrite rules added to this engine.
- *
- * @param plan The plan to be rewritten with all query rewrite rule.
- * @return The rewritten plan.
- */
- LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
deleted file mode 100644
index 89854df..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlanningException;
-
-/**
- * An interface for a rewrite rule.
- */
-public interface RewriteRule {
-
- /**
- * It returns the rewrite rule name. It will be used for debugging and
- * building a optimization history.
- *
- * @return The rewrite rule name
- */
- String getName();
-
- /**
- * This method checks if this rewrite rule can be applied to a given query plan.
- * For example, the selection push down can not be applied to the query plan without any filter.
- * In such case, it will return false.
- *
- * @param plan The plan to be checked
- * @return True if this rule can be applied to a given plan. Otherwise, false.
- */
- boolean isEligible(LogicalPlan plan);
-
- /**
- * Updates a logical plan and returns an updated logical plan rewritten by this rule.
- * It must be guaranteed that the input logical plan is not modified even after rewrite.
- * In other words, the rewrite has to modify an plan copied from the input plan.
- *
- * @param plan Input logical plan. It will not be modified.
- * @return The rewritten logical plan.
- */
- LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index f4160e4..86fd355 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -24,8 +24,8 @@ import org.apache.tajo.OverridableConf;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.logical.NodeType;
import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
@@ -132,9 +132,9 @@ public class QueryContext extends OverridableConf {
put(QueryVars.OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null);
}
- public PartitionMethodDesc getPartitionMethod() {
- return PartitionMethodDesc.fromJson(get(QueryVars.OUTPUT_PARTITIONS));
- }
+// public PartitionMethodDesc getPartitionMethod() {
+// return PartitionMethodDesc.fromJson(get(QueryVars.OUTPUT_PARTITIONS));
+// }
public void setOutputOverwrite() {
setBool(QueryVars.OUTPUT_OVERWRITE, true);
@@ -160,8 +160,8 @@ public class QueryContext extends OverridableConf {
}
}
- public boolean isCommandType(NodeType commandType) {
- return equalKey(QueryVars.COMMAND_TYPE, commandType.name());
+ public boolean isCommandType(String commandType) {
+ return equalKey(QueryVars.COMMAND_TYPE, commandType);
}
public void setCommandType(NodeType nodeType) {
@@ -178,7 +178,7 @@ public class QueryContext extends OverridableConf {
}
public boolean isCreateTable() {
- return isCommandType(NodeType.CREATE_TABLE);
+ return isCommandType(NodeType.CREATE_TABLE.name());
}
public void setInsert() {
@@ -186,6 +186,6 @@ public class QueryContext extends OverridableConf {
}
public boolean isInsert() {
- return isCommandType(NodeType.INSERT);
+ return isCommandType(NodeType.INSERT.name());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
deleted file mode 100644
index 981b572..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-
-public class SchemaUtil {
- // See TAJO-914 bug.
- //
- // Its essential problem is that constant value is evaluated multiple times at each scan.
- // As a result, join nodes can take the child nodes which have the same named fields.
- // Because current schema does not allow the same name and ignore the duplicated schema,
- // it finally causes the in-out schema mismatch between the parent and child nodes.
- //
- // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields.
- // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895.
- static int tmpColumnSeq = 0;
- public static Schema merge(Schema left, Schema right) {
- Schema merged = new Schema();
- for(Column col : left.getColumns()) {
- if (!merged.containsByQualifiedName(col.getQualifiedName())) {
- merged.addColumn(col);
- }
- }
- for(Column col : right.getColumns()) {
- if (merged.containsByQualifiedName(col.getQualifiedName())) {
- merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
- } else {
- merged.addColumn(col);
- }
- }
-
- // if overflow
- if (tmpColumnSeq < 0) {
- tmpColumnSeq = 0;
- }
- return merged;
- }
-
- /**
- * Get common columns to be used as join keys of natural joins.
- */
- public static Schema getNaturalJoinColumns(Schema left, Schema right) {
- Schema common = new Schema();
- for (Column outer : left.getColumns()) {
- if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
- common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
- }
- }
-
- return common;
- }
-
- public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
- Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
- if (tableName != null) {
- logicalSchema.setQualifier(tableName);
- }
- return logicalSchema;
- }
-
- public static <T extends Schema> T clone(Schema schema) {
- try {
- T copy = (T) schema.clone();
- return copy;
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 0752e11..aeb4e05 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -31,7 +31,7 @@ import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.Tuple;
@@ -215,68 +215,4 @@ public class TupleUtil {
return results;
}
}
-
- /**
- * Take a look at a column partition path. A partition path consists
- * of a table path part and column values part. This method transforms
- * a partition path into a tuple with a given partition column schema.
- *
- * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
- * ^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^
- * table path part column values part
- *
- * When a file path is given, it can perform two ways depending on beNullIfFile flag.
- * If it is true, it returns NULL when a given path is a file.
- * Otherwise, it returns a built tuple regardless of file or directory.
- *
- * @param partitionColumnSchema The partition column schema
- * @param partitionPath The partition path
- * @param beNullIfFile If true, this method returns NULL when a given path is a file.
- * @return The tuple transformed from a column values part.
- */
- public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
- boolean beNullIfFile) {
- int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
-
- if (startIdx == -1) { // if there is no partition column in the patch
- return null;
- }
- String columnValuesPart = partitionPath.toString().substring(startIdx);
-
- String [] columnValues = columnValuesPart.split("/");
-
- // true means this is a file.
- if (beNullIfFile && partitionColumnSchema.size() < columnValues.length) {
- return null;
- }
-
- Tuple tuple = new VTuple(partitionColumnSchema.size());
- int i = 0;
- for (; i < columnValues.length && i < partitionColumnSchema.size(); i++) {
- String [] parts = columnValues[i].split("=");
- if (parts.length != 2) {
- return null;
- }
- int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
- Column keyColumn = partitionColumnSchema.getColumn(columnId);
- tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), StringUtils.unescapePathName(parts[1])));
- }
- for (; i < partitionColumnSchema.size(); i++) {
- tuple.put(i, NullDatum.get());
- }
- return tuple;
- }
-
- /**
- * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
- * Then, you will get a string 'col1='.
- *
- * @param partitionColumn the schema of column partition
- * @return The first part string of column partition path.
- */
- private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
- StringBuilder sb = new StringBuilder();
- sb.append(partitionColumn.getColumn(0).getSimpleName()).append("=");
- return sb.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
index 333df11..9787276 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
@@ -18,9 +18,9 @@
package org.apache.tajo.engine.utils.test;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.rewrite.RewriteRule;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.rewrite.RewriteRule;
public class ErrorInjectionRewriter implements RewriteRule {
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 382b789..cd3be98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.algebra.AlterTablespaceSetType;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
@@ -37,20 +40,13 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.exception.IllegalQueryStatusException;
-import org.apache.tajo.engine.exception.VerifyException;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.EvalExprExec;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
@@ -60,6 +56,14 @@ import org.apache.tajo.master.querymaster.QueryInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
+import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
+import org.apache.tajo.plan.verifier.VerificationState;
+import org.apache.tajo.plan.verifier.VerifyException;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -71,8 +75,8 @@ import java.util.List;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
-import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
+import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
public class GlobalEngine extends AbstractService {
/** Class Logger */
@@ -267,7 +271,7 @@ public class GlobalEngine extends AbstractService {
// NonFromQuery indicates a form of 'select a, x+y;'
} else if (PlannerUtil.checkIfNonFromQuery(plan)) {
- Target [] targets = plan.getRootBlock().getRawTargets();
+ Target[] targets = plan.getRootBlock().getRawTargets();
if (targets == null) {
throw new PlanningException("No targets");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index c0bd842..768528d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -26,11 +26,10 @@ import org.apache.tajo.QueryUnitId;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil;
+import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.engine.planner.physical.SeqScanExec;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.StorageManager;
@@ -77,7 +76,7 @@ public class NonForwardQueryResultScanner {
}
private void initSeqScanExec() throws IOException {
- FragmentProto[] fragments = PlannerUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
+ FragmentProto[] fragments = PhysicalPlanUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
currentFileIndex, MAX_FILE_NUM_PER_SCAN);
if (fragments != null && fragments.length > 0) {
this.taskContext = new TaskAttemptContext(
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index b8240b8..1f445ec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -43,9 +43,9 @@ import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.CreateTableNode;
-import org.apache.tajo.engine.planner.logical.InsertNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.plan.logical.CreateTableNode;
+import org.apache.tajo.plan.logical.InsertNode;
+import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.StorageManager;
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 877a20a..536778a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index e4f47cd..bcca039 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index cb06df9..8ba9600 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -36,14 +36,14 @@ import org.apache.tajo.algebra.JsonHelper;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.ipc.TajoMasterProtocol;
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 8a3ef74..fe2752f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -33,13 +33,13 @@ import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.QueryUnitId;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
import org.apache.tajo.master.FragmentPair;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.Pair;