You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/26 06:46:21 UTC

[22/33] TAJO-1125: Separate logical plan and optimizer into a maven module.

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
deleted file mode 100644
index 3115104..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
+++ /dev/null
@@ -1,935 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.*;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.FunctionDesc;
-import org.apache.tajo.catalog.exception.NoSuchFunctionException;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.function.AggFunction;
-import org.apache.tajo.engine.function.GeneralFunction;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
-import org.apache.tajo.engine.planner.nameresolver.NameResolver;
-import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.exception.InvalidOperationException;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
-import org.apache.tajo.util.datetime.DateTimeUtil;
-import org.apache.tajo.util.datetime.TimeMeta;
-
-import java.util.Set;
-import java.util.Stack;
-
-import static org.apache.tajo.algebra.WindowSpec.WindowFrameEndBoundType;
-import static org.apache.tajo.algebra.WindowSpec.WindowFrameStartBoundType;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-import static org.apache.tajo.common.TajoDataTypes.Type;
-import static org.apache.tajo.engine.planner.logical.WindowSpec.WindowEndBound;
-import static org.apache.tajo.engine.planner.logical.WindowSpec.WindowFrame;
-import static org.apache.tajo.engine.planner.logical.WindowSpec.WindowStartBound;
-
-/**
- * <code>ExprAnnotator</code> makes an annotated expression called <code>EvalNode</code> from an
- * {@link org.apache.tajo.algebra.Expr}. It visits descendants recursively from a given expression, and finally
- * it returns an EvalNode.
- */
-public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, EvalNode> {
-  private CatalogService catalog;
-
-  public ExprAnnotator(CatalogService catalog) {
-    this.catalog = catalog;
-  }
-
-  static class Context {
-    LogicalPlan plan;
-    LogicalPlan.QueryBlock currentBlock;
-    NameResolvingMode columnRsvLevel;
-
-    public Context(LogicalPlanner.PlanContext planContext, NameResolvingMode colRsvLevel) {
-      this.plan = planContext.plan;
-      this.currentBlock = planContext.queryBlock;
-      this.columnRsvLevel = colRsvLevel;
-    }
-  }
-
-  public EvalNode createEvalNode(LogicalPlanner.PlanContext planContext, Expr expr,
-                                 NameResolvingMode colRsvLevel)
-      throws PlanningException {
-    Context context = new Context(planContext, colRsvLevel);
-    return planContext.evalOptimizer.optimize(planContext, visit(context, new Stack<Expr>(), expr));
-  }
-
-  public static void assertEval(boolean condition, String message) throws PlanningException {
-    if (!condition) {
-      throw new PlanningException(message);
-    }
-  }
-
-  /**
-   * It checks both terms in binary expression. If one of both needs type conversion, it inserts a cast expression.
-   *
-   * @param lhs left hand side term
-   * @param rhs right hand side term
-   * @return a pair including left/right hand side terms
-   */
-  public static Pair<EvalNode, EvalNode> convertTypesIfNecessary(EvalNode lhs, EvalNode rhs) {
-    Type lhsType = lhs.getValueType().getType();
-    Type rhsType = rhs.getValueType().getType();
-
-    // If one of both is NULL, it just returns the original types without casting.
-    if (lhsType == Type.NULL_TYPE || rhsType == Type.NULL_TYPE) {
-      return new Pair<EvalNode, EvalNode>(lhs, rhs);
-    }
-
-    Type toBeCasted = TUtil.getFromNestedMap(CatalogUtil.OPERATION_CASTING_MAP, lhsType, rhsType);
-    if (toBeCasted != null) { // if not null, one of either should be converted to another type.
-      // Overwrite lhs, rhs, or both with cast expression.
-      if (lhsType != toBeCasted) {
-        lhs = convertType(lhs, CatalogUtil.newSimpleDataType(toBeCasted));
-      }
-      if (rhsType != toBeCasted) {
-        rhs = convertType(rhs, CatalogUtil.newSimpleDataType(toBeCasted));
-      }
-    }
-
-    return new Pair<EvalNode, EvalNode>(lhs, rhs);
-  }
-
-  /**
-   * Insert a type conversion expression to a given expression.
-   * If the type of expression and <code>toType</code> is already the same, it just returns the original expression.
-   *
-   * @param evalNode an expression
-   * @param toType target type
-   * @return type converted expression.
-   */
-  private static EvalNode convertType(EvalNode evalNode, DataType toType) {
-
-    // if original and toType is the same, we don't need type conversion.
-    if (evalNode.getValueType().equals(toType)) {
-      return evalNode;
-    }
-    // the conversion to null is not allowed.
-    if (evalNode.getValueType().getType() == Type.NULL_TYPE || toType.getType() == Type.NULL_TYPE) {
-      return evalNode;
-    }
-
-    if (evalNode.getType() == EvalType.BETWEEN) {
-      BetweenPredicateEval between = (BetweenPredicateEval) evalNode;
-
-      between.setPredicand(convertType(between.getPredicand(), toType));
-      between.setBegin(convertType(between.getBegin(), toType));
-      between.setEnd(convertType(between.getEnd(), toType));
-
-      return between;
-
-    } else if (evalNode.getType() == EvalType.CASE) {
-
-      CaseWhenEval caseWhenEval = (CaseWhenEval) evalNode;
-      for (CaseWhenEval.IfThenEval ifThen : caseWhenEval.getIfThenEvals()) {
-        ifThen.setResult(convertType(ifThen.getResult(), toType));
-      }
-
-      if (caseWhenEval.hasElse()) {
-        caseWhenEval.setElseResult(convertType(caseWhenEval.getElse(), toType));
-      }
-
-      return caseWhenEval;
-
-    } else if (evalNode.getType() == EvalType.ROW_CONSTANT) {
-      RowConstantEval original = (RowConstantEval) evalNode;
-
-      Datum[] datums = original.getValues();
-      Datum[] convertedDatum = new Datum[datums.length];
-
-      for (int i = 0; i < datums.length; i++) {
-        convertedDatum[i] = DatumFactory.cast(datums[i], toType);
-      }
-
-      RowConstantEval convertedRowConstant = new RowConstantEval(convertedDatum);
-
-      return convertedRowConstant;
-
-    } else if (evalNode.getType() == EvalType.CONST) {
-      ConstEval original = (ConstEval) evalNode;
-      ConstEval newConst = new ConstEval(DatumFactory.cast(original.getValue(), toType));
-      return newConst;
-
-    } else {
-      return new CastEval(evalNode, toType);
-    }
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Logical Operator Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public EvalNode visitAnd(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    return new BinaryEval(EvalType.AND, left, right);
-  }
-
-  @Override
-  public EvalNode visitOr(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    return new BinaryEval(EvalType.OR, left, right);
-  }
-
-  @Override
-  public EvalNode visitNot(Context ctx, Stack<Expr> stack, NotExpr expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode child = visit(ctx, stack, expr.getChild());
-    stack.pop();
-    return new NotEval(child);
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Comparison Predicates Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  @Override
-  public EvalNode visitEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    return visitCommonComparison(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitNotEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    return visitCommonComparison(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitLessThan(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    return visitCommonComparison(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitLessThanOrEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    return visitCommonComparison(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitGreaterThan(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    return visitCommonComparison(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitGreaterThanOrEquals(Context ctx, Stack<Expr> stack, BinaryOperator expr)
-      throws PlanningException {
-    return visitCommonComparison(ctx, stack, expr);
-  }
-
-  public EvalNode visitCommonComparison(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    EvalType evalType;
-    switch (expr.getType()) {
-      case Equals:
-        evalType = EvalType.EQUAL;
-        break;
-      case NotEquals:
-        evalType = EvalType.NOT_EQUAL;
-        break;
-      case LessThan:
-        evalType = EvalType.LTH;
-        break;
-      case LessThanOrEquals:
-        evalType = EvalType.LEQ;
-        break;
-      case GreaterThan:
-        evalType = EvalType.GTH;
-        break;
-      case GreaterThanOrEquals:
-        evalType = EvalType.GEQ;
-        break;
-      default:
-      throw new IllegalStateException("Wrong Expr Type: " + expr.getType());
-    }
-
-    return createBinaryNode(evalType, left, right);
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Other Predicates Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public EvalNode visitBetween(Context ctx, Stack<Expr> stack, BetweenPredicate between) throws PlanningException {
-    stack.push(between);
-    EvalNode predicand = visit(ctx, stack, between.predicand());
-    EvalNode begin = visit(ctx, stack, between.begin());
-    EvalNode end = visit(ctx, stack, between.end());
-    stack.pop();
-
-    // implicit type conversion
-    DataType widestType = null;
-
-    try {
-      widestType = CatalogUtil.getWidestType(predicand.getValueType(), begin.getValueType(), end.getValueType());
-    } catch (InvalidOperationException ioe) {
-      throw new PlanningException(ioe);
-    }
-
-    BetweenPredicateEval betweenEval = new BetweenPredicateEval(
-        between.isNot(),
-        between.isSymmetric(),
-        predicand, begin, end);
-
-    betweenEval = (BetweenPredicateEval) convertType(betweenEval, widestType);
-    return betweenEval;
-  }
-
-  @Override
-  public EvalNode visitCaseWhen(Context ctx, Stack<Expr> stack, CaseWhenPredicate caseWhen) throws PlanningException {
-    CaseWhenEval caseWhenEval = new CaseWhenEval();
-
-    EvalNode condition;
-    EvalNode result;
-
-    for (CaseWhenPredicate.WhenExpr when : caseWhen.getWhens()) {
-      condition = visit(ctx, stack, when.getCondition());
-      result = visit(ctx, stack, when.getResult());
-      caseWhenEval.addIfCond(condition, result);
-    }
-
-    if (caseWhen.hasElseResult()) {
-      caseWhenEval.setElseResult(visit(ctx, stack, caseWhen.getElseResult()));
-    }
-
-    // Getting the widest type from all if-then expressions and else expression.
-    DataType widestType = caseWhenEval.getIfThenEvals().get(0).getResult().getValueType();
-    for (int i = 1; i < caseWhenEval.getIfThenEvals().size(); i++) {
-      widestType = CatalogUtil.getWidestType(caseWhenEval.getIfThenEvals().get(i).getResult().getValueType(),
-          widestType);
-    }
-    if (caseWhen.hasElseResult()) {
-      widestType = CatalogUtil.getWidestType(widestType, caseWhenEval.getElse().getValueType());
-    }
-
-    assertEval(widestType != null, "Invalid Type Conversion for CaseWhen");
-
-    // implicit type conversion
-    caseWhenEval = (CaseWhenEval) convertType(caseWhenEval, widestType);
-
-    return caseWhenEval;
-  }
-
-  @Override
-  public EvalNode visitIsNullPredicate(Context ctx, Stack<Expr> stack, IsNullPredicate expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode child = visit(ctx, stack, expr.getPredicand());
-    stack.pop();
-    return new IsNullEval(expr.isNot(), child);
-  }
-
-  @Override
-  public EvalNode visitInPredicate(Context ctx, Stack<Expr> stack, InPredicate expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode lhs = visit(ctx, stack, expr.getLeft());
-    RowConstantEval rowConstantEval = (RowConstantEval) visit(ctx, stack, expr.getInValue());
-    stack.pop();
-
-    Pair<EvalNode, EvalNode> pair = convertTypesIfNecessary(lhs, rowConstantEval);
-
-    return new InEval(pair.getFirst(), (RowConstantEval) pair.getSecond(), expr.isNot());
-  }
-
-  @Override
-  public EvalNode visitValueListExpr(Context ctx, Stack<Expr> stack, ValueListExpr expr) throws PlanningException {
-    Datum[] values = new Datum[expr.getValues().length];
-    EvalNode [] evalNodes = new EvalNode[expr.getValues().length];
-    for (int i = 0; i < expr.getValues().length; i++) {
-      evalNodes[i] = visit(ctx, stack, expr.getValues()[i]);
-      if (!EvalTreeUtil.checkIfCanBeConstant(evalNodes[i])) {
-        throw new PlanningException("Non constant values cannot be included in IN PREDICATE.");
-      }
-      values[i] = EvalTreeUtil.evaluateImmediately(evalNodes[i]);
-    }
-    return new RowConstantEval(values);
-  }
-
-  @Override
-  public EvalNode visitExistsPredicate(Context ctx, Stack<Expr> stack, ExistsPredicate expr) throws PlanningException {
-    throw new PlanningException("Cannot support EXISTS clause yet");
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // String Operator or Pattern Matching Predicates Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  @Override
-  public EvalNode visitLikePredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
-      throws PlanningException {
-    return visitPatternMatchPredicate(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitSimilarToPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
-      throws PlanningException {
-    return visitPatternMatchPredicate(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitRegexpPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
-      throws PlanningException {
-    return visitPatternMatchPredicate(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitConcatenate(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode lhs = visit(ctx, stack, expr.getLeft());
-    EvalNode rhs = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    if (lhs.getValueType().getType() != Type.TEXT) {
-      lhs = convertType(lhs, CatalogUtil.newSimpleDataType(Type.TEXT));
-    }
-    if (rhs.getValueType().getType() != Type.TEXT) {
-      rhs = convertType(rhs, CatalogUtil.newSimpleDataType(Type.TEXT));
-    }
-
-    return new BinaryEval(EvalType.CONCATENATE, lhs, rhs);
-  }
-
-  private EvalNode visitPatternMatchPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
-      throws PlanningException {
-    EvalNode field = visit(ctx, stack, expr.getPredicand());
-    ConstEval pattern = (ConstEval) visit(ctx, stack, expr.getPattern());
-
-    // A pattern is a const value in pattern matching predicates.
-    // In a binary expression, the result is always null if a const value in left or right side is null.
-    if (pattern.getValue() instanceof NullDatum) {
-      return new ConstEval(NullDatum.get());
-    } else {
-      if (expr.getType() == OpType.LikePredicate) {
-        return new LikePredicateEval(expr.isNot(), field, pattern, expr.isCaseInsensitive());
-      } else if (expr.getType() == OpType.SimilarToPredicate) {
-        return new SimilarToPredicateEval(expr.isNot(), field, pattern);
-      } else {
-        return new RegexPredicateEval(expr.isNot(), field, pattern, expr.isCaseInsensitive());
-      }
-    }
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Arithmetic Operators
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static BinaryEval createBinaryNode(EvalType type, EvalNode lhs, EvalNode rhs) {
-    Pair<EvalNode, EvalNode> pair = convertTypesIfNecessary(lhs, rhs); // implicit type conversion if necessary
-    return new BinaryEval(type, pair.getFirst(), pair.getSecond());
-  }
-
-  @Override
-  public EvalNode visitPlus(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    return createBinaryNode(EvalType.PLUS, left, right);
-  }
-
-  @Override
-  public EvalNode visitMinus(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    return createBinaryNode(EvalType.MINUS, left, right);
-  }
-
-  @Override
-  public EvalNode visitMultiply(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    return createBinaryNode(EvalType.MULTIPLY, left, right);
-  }
-
-  @Override
-  public EvalNode visitDivide(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    return createBinaryNode(EvalType.DIVIDE, left, right);
-  }
-
-  @Override
-  public EvalNode visitModular(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
-    stack.pop();
-
-    return createBinaryNode(EvalType.MODULAR, left, right);
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Other Expressions
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public EvalNode visitSign(Context ctx, Stack<Expr> stack, SignedExpr expr) throws PlanningException {
-    stack.push(expr);
-    EvalNode numericExpr = visit(ctx, stack, expr.getChild());
-    stack.pop();
-
-    if (expr.isNegative()) {
-      return new SignedEval(expr.isNegative(), numericExpr);
-    } else {
-      return numericExpr;
-    }
-  }
-
-  @Override
-  public EvalNode visitColumnReference(Context ctx, Stack<Expr> stack, ColumnReferenceExpr expr)
-      throws PlanningException {
-    Column column;
-
-    switch (ctx.columnRsvLevel) {
-    case LEGACY:
-      column = ctx.plan.resolveColumn(ctx.currentBlock, expr);
-      break;
-    case RELS_ONLY:
-    case RELS_AND_SUBEXPRS:
-    case SUBEXPRS_AND_RELS:
-      column = NameResolver.resolve(ctx.plan, ctx.currentBlock, expr, ctx.columnRsvLevel);
-      break;
-    default:
-      throw new PlanningException("Unsupported column resolving level: " + ctx.columnRsvLevel.name());
-    }
-    return new FieldEval(column);
-  }
-
-  @Override
-  public EvalNode visitTargetExpr(Context ctx, Stack<Expr> stack, NamedExpr expr) throws PlanningException {
-    throw new PlanningException("ExprAnnotator cannot take NamedExpr");
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Functions and General Set Functions Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public EvalNode visitFunction(Context ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
-    stack.push(expr); // <--- Push
-
-    // Given parameters
-    Expr[] params = expr.getParams();
-    if (params == null) {
-      params = new Expr[0];
-    }
-
-    EvalNode[] givenArgs = new EvalNode[params.length];
-    DataType[] paramTypes = new DataType[params.length];
-
-    for (int i = 0; i < params.length; i++) {
-      givenArgs[i] = visit(ctx, stack, params[i]);
-      paramTypes[i] = givenArgs[i].getValueType();
-    }
-
-    stack.pop(); // <--- Pop
-
-    if (!catalog.containFunction(expr.getSignature(), paramTypes)) {
-      throw new NoSuchFunctionException(expr.getSignature(), paramTypes);
-    }
-
-    FunctionDesc funcDesc = catalog.getFunction(expr.getSignature(), paramTypes);
-
-    // trying the implicit type conversion between actual parameter types and the definition types.
-    if (CatalogUtil.checkIfVariableLengthParamDefinition(TUtil.newList(funcDesc.getParamTypes()))) {
-      DataType lastDataType = funcDesc.getParamTypes()[0];
-      for (int i = 0; i < givenArgs.length; i++) {
-        if (i < (funcDesc.getParamTypes().length - 1)) { // variable length
-          lastDataType = funcDesc.getParamTypes()[i];
-        } else {
-          lastDataType = CatalogUtil.newSimpleDataType(CatalogUtil.getPrimitiveTypeOf(lastDataType.getType()));
-        }
-        givenArgs[i] = convertType(givenArgs[i], lastDataType);
-      }
-    } else {
-      assertEval(funcDesc.getParamTypes().length == givenArgs.length,
-          "The number of parameters is mismatched to the function definition: " + funcDesc.toString());
-      // According to our function matching method, each given argument can be casted to the definition parameter.
-      for (int i = 0; i < givenArgs.length; i++) {
-        givenArgs[i] = convertType(givenArgs[i], funcDesc.getParamTypes()[i]);
-      }
-    }
-
-
-    try {
-      FunctionType functionType = funcDesc.getFuncType();
-      if (functionType == FunctionType.GENERAL
-          || functionType == FunctionType.UDF) {
-        return new GeneralFunctionEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
-      } else if (functionType == FunctionType.AGGREGATION
-          || functionType == FunctionType.UDA) {
-        if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
-          ctx.currentBlock.setAggregationRequire();
-        }
-        return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
-      } else if (functionType == FunctionType.DISTINCT_AGGREGATION
-          || functionType == FunctionType.DISTINCT_UDA) {
-        throw new PlanningException("Unsupported function: " + funcDesc.toString());
-      } else {
-        throw new PlanningException("Unsupported Function Type: " + functionType.name());
-      }
-    } catch (InternalException e) {
-      throw new PlanningException(e);
-    }
-  }
-
-  @Override
-  public EvalNode visitCountRowsFunction(Context ctx, Stack<Expr> stack, CountRowsFunctionExpr expr)
-      throws PlanningException {
-    FunctionDesc countRows = catalog.getFunction("count", FunctionType.AGGREGATION,
-        new DataType[] {});
-    if (countRows == null) {
-      throw new NoSuchFunctionException(expr.getSignature(), new DataType[]{});
-    }
-
-    try {
-      ctx.currentBlock.setAggregationRequire();
-
-      return new AggregationFunctionCallEval(countRows, (AggFunction) countRows.newInstance(),
-          new EvalNode[] {});
-    } catch (InternalException e) {
-      throw new NoSuchFunctionException(countRows.getFunctionName(), new DataType[]{});
-    }
-  }
-
-  @Override
-  public EvalNode visitGeneralSetFunction(Context ctx, Stack<Expr> stack, GeneralSetFunctionExpr setFunction)
-      throws PlanningException {
-
-    Expr[] params = setFunction.getParams();
-    EvalNode[] givenArgs = new EvalNode[params.length];
-    DataType[] paramTypes = new DataType[params.length];
-
-    FunctionType functionType = setFunction.isDistinct() ?
-        FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
-    givenArgs[0] = visit(ctx, stack, params[0]);
-    if (setFunction.getSignature().equalsIgnoreCase("count")) {
-      paramTypes[0] = CatalogUtil.newSimpleDataType(Type.ANY);
-    } else {
-      paramTypes[0] = givenArgs[0].getValueType();
-    }
-
-    if (!catalog.containFunction(setFunction.getSignature(), functionType, paramTypes)) {
-      throw new NoSuchFunctionException(setFunction.getSignature(), paramTypes);
-    }
-
-    FunctionDesc funcDesc = catalog.getFunction(setFunction.getSignature(), functionType, paramTypes);
-    if (!ctx.currentBlock.hasNode(NodeType.GROUP_BY)) {
-      ctx.currentBlock.setAggregationRequire();
-    }
-
-    try {
-      return new AggregationFunctionCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
-    } catch (InternalException e) {
-      throw new PlanningException(e);
-    }
-  }
-
-  public static final Set<String> WINDOW_FUNCTIONS =
-      Sets.newHashSet("row_number", "rank", "dense_rank", "percent_rank", "cume_dist");
-
-  public EvalNode visitWindowFunction(Context ctx, Stack<Expr> stack, WindowFunctionExpr windowFunc)
-      throws PlanningException {
-
-    WindowSpec windowSpec = windowFunc.getWindowSpec();
-
-    Expr key;
-    if (windowSpec.hasPartitionBy()) {
-      for (int i = 0; i < windowSpec.getPartitionKeys().length; i++) {
-        key = windowSpec.getPartitionKeys()[i];
-        visit(ctx, stack, key);
-      }
-    }
-
-    EvalNode [] sortKeys = null;
-    if (windowSpec.hasOrderBy()) {
-      sortKeys = new EvalNode[windowSpec.getSortSpecs().length];
-      for (int i = 0; i < windowSpec.getSortSpecs().length; i++) {
-        key = windowSpec.getSortSpecs()[i].getKey();
-        sortKeys[i] = visit(ctx, stack, key);
-      }
-    }
-
-    String funcName = windowFunc.getSignature();
-    boolean distinct = windowFunc.isDistinct();
-    Expr[] params = windowFunc.getParams();
-    EvalNode[] givenArgs = new EvalNode[params.length];
-    TajoDataTypes.DataType[] paramTypes = new TajoDataTypes.DataType[params.length];
-    FunctionType functionType;
-
-    WindowFrame frame = null;
-
-    if (params.length > 0) {
-      givenArgs[0] = visit(ctx, stack, params[0]);
-      if (windowFunc.getSignature().equalsIgnoreCase("count")) {
-        paramTypes[0] = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.ANY);
-      } else if (windowFunc.getSignature().equalsIgnoreCase("row_number")) {
-        paramTypes[0] = CatalogUtil.newSimpleDataType(Type.INT8);
-      } else {
-        paramTypes[0] = givenArgs[0].getValueType();
-      }
-    } else {
-      if (windowFunc.getSignature().equalsIgnoreCase("rank")) {
-        givenArgs = sortKeys != null ? sortKeys : new EvalNode[0];
-      }
-    }
-
-    if (frame == null) {
-      if (windowSpec.hasOrderBy()) {
-        frame = new WindowFrame(new WindowStartBound(WindowFrameStartBoundType.UNBOUNDED_PRECEDING),
-            new WindowEndBound(WindowFrameEndBoundType.CURRENT_ROW));
-      } else if (windowFunc.getSignature().equalsIgnoreCase("row_number")) {
-        frame = new WindowFrame(new WindowStartBound(WindowFrameStartBoundType.UNBOUNDED_PRECEDING),
-            new WindowEndBound(WindowFrameEndBoundType.UNBOUNDED_FOLLOWING));
-      } else {
-        frame = new WindowFrame();
-      }
-    }
-
-    // TODO - containFunction and getFunction should support the function type mask which provides ORing multiple types.
-    // the below checking against WINDOW_FUNCTIONS is a workaround code for the above problem.
-    if (WINDOW_FUNCTIONS.contains(funcName.toLowerCase())) {
-      if (distinct) {
-        throw new NoSuchFunctionException("row_number() does not support distinct keyword.");
-      }
-      functionType = FunctionType.WINDOW;
-    } else {
-      functionType = distinct ? FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
-    }
-
-    if (!catalog.containFunction(windowFunc.getSignature(), functionType, paramTypes)) {
-      throw new NoSuchFunctionException(funcName, paramTypes);
-    }
-
-    FunctionDesc funcDesc = catalog.getFunction(funcName, functionType, paramTypes);
-
-    try {
-      return new WindowFunctionEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs, frame);
-    } catch (InternalException e) {
-      throw new PlanningException(e);
-    }
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Literal Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public EvalNode visitDataType(Context ctx, Stack<Expr> stack, DataTypeExpr expr) throws PlanningException {
-    return super.visitDataType(ctx, stack, expr);
-  }
-
-  @Override
-  public EvalNode visitCastExpr(Context ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
-    EvalNode child = super.visitCastExpr(ctx, stack, expr);
-
-    if (child.getType() == EvalType.CONST) { // if it is a casting operation for a constant value
-      ConstEval constEval = (ConstEval) child; // it will be pre-computed and casted to a constant value
-      return new ConstEval(DatumFactory.cast(constEval.getValue(), LogicalPlanner.convertDataType(expr.getTarget())));
-    } else {
-      return new CastEval(child, LogicalPlanner.convertDataType(expr.getTarget()));
-    }
-  }
-
-  @Override
-  public EvalNode visitLiteral(Context ctx, Stack<Expr> stack, LiteralValue expr) throws PlanningException {
-    switch (expr.getValueType()) {
-    case Boolean:
-      return new ConstEval(DatumFactory.createBool(((BooleanLiteral) expr).isTrue()));
-    case String:
-      return new ConstEval(DatumFactory.createText(expr.getValue()));
-    case Unsigned_Integer:
-      return new ConstEval(DatumFactory.createInt4(expr.getValue()));
-    case Unsigned_Large_Integer:
-      return new ConstEval(DatumFactory.createInt8(expr.getValue()));
-    case Unsigned_Float:
-      return new ConstEval(DatumFactory.createFloat8(expr.getValue()));
-    default:
-      throw new RuntimeException("Unsupported type: " + expr.getValueType());
-    }
-  }
-
-  @Override
-  public EvalNode visitNullLiteral(Context ctx, Stack<Expr> stack, NullLiteral expr) throws PlanningException {
-    return new ConstEval(NullDatum.get());
-  }
-
-  @Override
-  public EvalNode visitDateLiteral(Context context, Stack<Expr> stack, DateLiteral expr) throws PlanningException {
-    DateValue dateValue = expr.getDate();
-    int[] dates = dateToIntArray(dateValue.getYears(), dateValue.getMonths(), dateValue.getDays());
-
-    TimeMeta tm = new TimeMeta();
-    tm.years = dates[0];
-    tm.monthOfYear = dates[1];
-    tm.dayOfMonth = dates[2];
-
-    DateTimeUtil.j2date(DateTimeUtil.date2j(dates[0], dates[1], dates[2]), tm);
-
-    return new ConstEval(new DateDatum(DateTimeUtil.date2j(tm.years, tm.monthOfYear, tm.dayOfMonth)));
-  }
-
-  @Override
-  public EvalNode visitTimestampLiteral(Context ctx, Stack<Expr> stack, TimestampLiteral expr)
-      throws PlanningException {
-    DateValue dateValue = expr.getDate();
-    TimeValue timeValue = expr.getTime();
-
-    int [] dates = dateToIntArray(dateValue.getYears(),
-        dateValue.getMonths(),
-        dateValue.getDays());
-    int [] times = timeToIntArray(timeValue.getHours(),
-        timeValue.getMinutes(),
-        timeValue.getSeconds(),
-        timeValue.getSecondsFraction());
-
-    long timestamp;
-    if (timeValue.hasSecondsFraction()) {
-      timestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2],
-          times[3] * 1000);
-    } else {
-      timestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], 0);
-    }
-
-    TimeMeta tm = new TimeMeta();
-    DateTimeUtil.toJulianTimeMeta(timestamp, tm);
-    DateTimeUtil.toUTCTimezone(tm);
-
-    return new ConstEval(new TimestampDatum(DateTimeUtil.toJulianTimestamp(tm)));
-  }
-
-  @Override
-  public EvalNode visitIntervalLiteral(Context ctx, Stack<Expr> stack, IntervalLiteral expr) throws PlanningException {
-    return new ConstEval(new IntervalDatum(expr.getExprStr()));
-  }
-
-  @Override
-  public EvalNode visitTimeLiteral(Context ctx, Stack<Expr> stack, TimeLiteral expr) throws PlanningException {
-    TimeValue timeValue = expr.getTime();
-    int [] times = timeToIntArray(timeValue.getHours(),
-        timeValue.getMinutes(),
-        timeValue.getSeconds(),
-        timeValue.getSecondsFraction());
-
-    long time;
-    if (timeValue.hasSecondsFraction()) {
-      time = DateTimeUtil.toTime(times[0], times[1], times[2], times[3] * 1000);
-    } else {
-      time = DateTimeUtil.toTime(times[0], times[1], times[2], 0);
-    }
-    TimeDatum timeDatum = new TimeDatum(time);
-    TimeMeta tm = timeDatum.toTimeMeta();
-    DateTimeUtil.toUTCTimezone(tm);
-
-    return new ConstEval(new TimeDatum(DateTimeUtil.toTime(tm)));
-  }
-
-  public static int [] dateToIntArray(String years, String months, String days)
-      throws PlanningException {
-    int year = Integer.valueOf(years);
-    int month = Integer.valueOf(months);
-    int day = Integer.valueOf(days);
-
-    if (!(1 <= year && year <= 9999)) {
-      throw new PlanningException(String.format("Years (%d) must be between 1 and 9999 integer value", year));
-    }
-
-    if (!(1 <= month && month <= 12)) {
-      throw new PlanningException(String.format("Months (%d) must be between 1 and 12 integer value", month));
-    }
-
-    if (!(1<= day && day <= 31)) {
-      throw new PlanningException(String.format("Days (%d) must be between 1 and 31 integer value", day));
-    }
-
-    int [] results = new int[3];
-    results[0] = year;
-    results[1] = month;
-    results[2] = day;
-
-    return results;
-  }
-
-  public static int [] timeToIntArray(String hours, String minutes, String seconds, String fractionOfSecond)
-      throws PlanningException {
-    int hour = Integer.valueOf(hours);
-    int minute = Integer.valueOf(minutes);
-    int second = Integer.valueOf(seconds);
-    int fraction = 0;
-    if (fractionOfSecond != null) {
-      fraction = Integer.valueOf(fractionOfSecond);
-    }
-
-    if (!(0 <= hour && hour <= 23)) {
-      throw new PlanningException(String.format("Hours (%d) must be between 0 and 24 integer value", hour));
-    }
-
-    if (!(0 <= minute && minute <= 59)) {
-      throw new PlanningException(String.format("Minutes (%d) must be between 0 and 59 integer value", minute));
-    }
-
-    if (!(0 <= second && second <= 59)) {
-      throw new PlanningException(String.format("Seconds (%d) must be between 0 and 59 integer value", second));
-    }
-
-    if (fraction != 0) {
-      if (!(0 <= fraction && fraction <= 999)) {
-        throw new PlanningException(String.format("Seconds (%d) must be between 0 and 999 integer value", fraction));
-      }
-    }
-
-    int [] results = new int[4];
-    results[0] = hour;
-    results[1] = minute;
-    results[2] = second;
-    results[3] = fraction;
-
-    return results;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java
deleted file mode 100644
index dbc844d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import org.apache.tajo.algebra.BinaryOperator;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.algebra.OpType;
-import org.apache.tajo.algebra.UnaryOperator;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Stack;
-
-public class ExprFinder extends SimpleAlgebraVisitor<ExprFinder.Context, Object> {
-
-  static class Context {
-    Set<Expr> set = new HashSet<Expr>();
-    OpType targetType;
-
-    Context(OpType type) {
-      this.targetType = type;
-    }
-  }
-
-  public static <T extends Expr> Set<T> finds(Expr expr, OpType type) {
-    Context context = new Context(type);
-    ExprFinder finder = new ExprFinder();
-    Stack<Expr> stack = new Stack<Expr>();
-    stack.push(expr);
-    try {
-      finder.visit(context, new Stack<Expr>(), expr);
-    } catch (PlanningException e) {
-      throw new RuntimeException(e);
-    }
-    stack.pop();
-    return (Set<T>) context.set;
-  }
-
-  public Object visit(Context ctx, Stack<Expr> stack, Expr expr) throws PlanningException {
-    if (expr instanceof UnaryOperator) {
-      preHook(ctx, stack, expr);
-      visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
-      postHook(ctx, stack, expr, null);
-    } else if (expr instanceof BinaryOperator) {
-      preHook(ctx, stack, expr);
-      visitBinaryOperator(ctx, stack, (BinaryOperator) expr);
-      postHook(ctx, stack, expr, null);
-    } else {
-      super.visit(ctx, stack, expr);
-    }
-
-    if (ctx.targetType == expr.getType()) {
-      ctx.set.add(expr);
-    }
-
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
deleted file mode 100644
index 3718056..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.engine.exception.NoSuchColumnException;
-import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
-import org.apache.tajo.engine.planner.nameresolver.NameResolver;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.Stack;
-
-/**
- * ExprNormalizer performs two kinds of works:
- *
- * <h3>1. Duplication Removal.</h3>
- *
- * For example, assume a simple query as follows:
- * <pre>
- *   select price * rate as total_price, ..., order by price * rate
- * </pre>
- *
- * The expression <code>price * rate</code> is duplicated in both select list and order by clause.
- * Against those cases, ExprNormalizer removes duplicated expressions and replaces one with one reference.
- * In the case, ExprNormalizer replaces price * rate with total_price reference.
- *
- * <h3>2. Dissection of Expression</h3>
- *
- * A expression can be a complex expressions, including a mixed of scalar and aggregation expressions.
- * For example, assume an aggregation query as follows:
- * <pre>
- *   select sum(price * rate) * (1 - avg(discount_rate))), ...
- * </pre>
- *
- * In this case, ExprNormalizer dissects the expression 'sum(price * rate) * (1 - avg(discount_rate)))'
- * into the following expressions:
- * <ul>
- *   <li>$1 = price * rage</li>
- *   <li>$2 = sum($1)</li>
- *   <li>$3 = avg(discount_rate)</li>
- *   <li>$4 = $2 * (1 - $3)</li>
- * </ul>
- *
- * It mainly two advantages. Firstly, it makes complex expression evaluations easier across multiple physical executors.
- * Second, it gives move opportunities to remove duplicated expressions.
- *
- * <h3>3. Name Normalization</h3>
- *
- * Users can use qualified column names, unqualified column names or aliased column references.
- *
- * Consider the following example:
- *
- * <pre>
- *   select rate_a as total_rate, rate_a * 100, table1.rate_a, ... WHERE total_rate * 100
- * </pre>
- *
- * <code>total_rate</code>, <code>rate_a</code>, and <code>table1.rate_a</code> are all the same references. But,
- * they have different forms. Due to their different forms, duplication removal can be hard.
- *
- * In order to solve this problem, ExprNormalizer normalizes all column references as qualified names while it keeps
- * its points..
- */
-class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedResult, Object> {
-
-  public static class ExprNormalizedResult {
-    private final LogicalPlan plan;
-    private final LogicalPlan.QueryBlock block;
-    private final boolean tryBinaryCommonTermsElimination;
-
-    Expr baseExpr; // outmost expressions, which can includes one or more references of the results of aggregation
-                   // function.
-    List<NamedExpr> aggExprs = new ArrayList<NamedExpr>(); // aggregation functions
-    List<NamedExpr> scalarExprs = new ArrayList<NamedExpr>(); // scalar expressions which can be referred
-    List<NamedExpr> windowAggExprs = new ArrayList<NamedExpr>(); // window expressions which can be referred
-    Set<WindowSpecReferences> windowSpecs = Sets.newLinkedHashSet();
-
-    public ExprNormalizedResult(LogicalPlanner.PlanContext context, boolean tryBinaryCommonTermsElimination) {
-      this.plan = context.plan;
-      this.block = context.queryBlock;
-      this.tryBinaryCommonTermsElimination = tryBinaryCommonTermsElimination;
-    }
-
-    public boolean isBinaryCommonTermsElimination() {
-      return tryBinaryCommonTermsElimination;
-    }
-
-    @Override
-    public String toString() {
-      return baseExpr.toString() + ", agg=" + aggExprs.size() + ", scalar=" + scalarExprs.size();
-    }
-  }
-
-  public ExprNormalizedResult normalize(LogicalPlanner.PlanContext context, Expr expr) throws PlanningException {
-    return normalize(context, expr, false);
-  }
-  public ExprNormalizedResult normalize(LogicalPlanner.PlanContext context, Expr expr, boolean subexprElimination)
-      throws PlanningException {
-    ExprNormalizedResult exprNormalizedResult = new ExprNormalizedResult(context, subexprElimination);
-    Stack<Expr> stack = new Stack<Expr>();
-    stack.push(expr);
-    visit(exprNormalizedResult, new Stack<Expr>(), expr);
-    exprNormalizedResult.baseExpr = stack.pop();
-    return exprNormalizedResult;
-  }
-
-  @Override
-  public Object visitCaseWhen(ExprNormalizedResult ctx, Stack<Expr> stack, CaseWhenPredicate expr)
-      throws PlanningException {
-    stack.push(expr);
-    for (CaseWhenPredicate.WhenExpr when : expr.getWhens()) {
-      visit(ctx, stack, when.getCondition());
-      visit(ctx, stack, when.getResult());
-
-      if (OpType.isAggregationFunction(when.getCondition().getType())) {
-        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getCondition());
-        ctx.aggExprs.add(new NamedExpr(when.getCondition(), referenceName));
-        when.setCondition(new ColumnReferenceExpr(referenceName));
-      }
-
-      if (OpType.isAggregationFunction(when.getResult().getType())) {
-        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getResult());
-        ctx.aggExprs.add(new NamedExpr(when.getResult(), referenceName));
-        when.setResult(new ColumnReferenceExpr(referenceName));
-      }
-    }
-
-    if (expr.hasElseResult()) {
-      visit(ctx, stack, expr.getElseResult());
-      if (OpType.isAggregationFunction(expr.getElseResult().getType())) {
-        String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getElseResult());
-        ctx.aggExprs.add(new NamedExpr(expr.getElseResult(), referenceName));
-        expr.setElseResult(new ColumnReferenceExpr(referenceName));
-      }
-    }
-    stack.pop();
-    return expr;
-  }
-
-  @Override
-  public Expr visitUnaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, UnaryOperator expr) throws PlanningException {
-    super.visitUnaryOperator(ctx, stack, expr);
-    if (OpType.isAggregationFunction(expr.getChild().getType())) {
-      // Get an anonymous column name and replace the aggregation function by the column name
-      String refName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
-      ctx.aggExprs.add(new NamedExpr(expr.getChild(), refName));
-      expr.setChild(new ColumnReferenceExpr(refName));
-    }
-
-    return expr;
-  }
-
-  private boolean isBinaryCommonTermsElimination(ExprNormalizedResult ctx, Expr expr) {
-    return ctx.isBinaryCommonTermsElimination() && expr.getType() != OpType.Column
-        && ctx.block.namedExprsMgr.contains(expr);
-  }
-
-  @Override
-  public Expr visitBinaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
-    stack.push(expr);
-
-    visit(ctx, new Stack<Expr>(), expr.getLeft());
-    if (isBinaryCommonTermsElimination(ctx, expr.getLeft())) {
-      String refName = ctx.block.namedExprsMgr.addExpr(expr.getLeft());
-      expr.setLeft(new ColumnReferenceExpr(refName));
-    }
-
-    visit(ctx, new Stack<Expr>(), expr.getRight());
-    if (isBinaryCommonTermsElimination(ctx, expr.getRight())) {
-      String refName = ctx.block.namedExprsMgr.addExpr(expr.getRight());
-      expr.setRight(new ColumnReferenceExpr(refName));
-    }
-    stack.pop();
-
-    ////////////////////////
-    // For Left Term
-    ////////////////////////
-
-    if (OpType.isAggregationFunction(expr.getLeft().getType())) {
-      String leftRefName = ctx.block.namedExprsMgr.addExpr(expr.getLeft());
-      ctx.aggExprs.add(new NamedExpr(expr.getLeft(), leftRefName));
-      expr.setLeft(new ColumnReferenceExpr(leftRefName));
-    }
-
-
-    ////////////////////////
-    // For Right Term
-    ////////////////////////
-    if (OpType.isAggregationFunction(expr.getRight().getType())) {
-      String rightRefName = ctx.block.namedExprsMgr.addExpr(expr.getRight());
-      ctx.aggExprs.add(new NamedExpr(expr.getRight(), rightRefName));
-      expr.setRight(new ColumnReferenceExpr(rightRefName));
-    }
-
-    return expr;
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Function Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public Expr visitFunction(ExprNormalizedResult ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
-    stack.push(expr);
-
-    Expr param;
-    Expr[] paramExprs = expr.getParams();
-    if (paramExprs != null) {
-      for (int i = 0; i < paramExprs.length; i++) {
-        param = paramExprs[i];
-        visit(ctx, stack, param);
-
-        if (OpType.isAggregationFunction(param.getType())) {
-          String referenceName = ctx.plan.generateUniqueColumnName(param);
-          ctx.aggExprs.add(new NamedExpr(param, referenceName));
-          expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
-        }
-      }
-    }
-    stack.pop();
-
-    return expr;
-  }
-
-  @Override
-  public Expr visitGeneralSetFunction(ExprNormalizedResult ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
-      throws PlanningException {
-    stack.push(expr);
-
-    Expr param;
-    for (int i = 0; i < expr.getParams().length; i++) {
-      param = expr.getParams()[i];
-      visit(ctx, stack, param);
-
-
-      // If parameters are all constants, we don't need to dissect an aggregation expression into two parts:
-      // function and parameter parts.
-      if (!OpType.isLiteralType(param.getType()) && param.getType() != OpType.Column) {
-        String referenceName = ctx.block.namedExprsMgr.addExpr(param);
-        ctx.scalarExprs.add(new NamedExpr(param, referenceName));
-        expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
-      }
-    }
-    stack.pop();
-    return expr;
-  }
-
-  public Expr visitWindowFunction(ExprNormalizedResult ctx, Stack<Expr> stack, WindowFunctionExpr expr)
-      throws PlanningException {
-    stack.push(expr);
-
-    WindowSpec windowSpec = expr.getWindowSpec();
-    Expr key;
-
-    WindowSpecReferences windowSpecReferences;
-    if (windowSpec.hasWindowName()) {
-      windowSpecReferences = new WindowSpecReferences(windowSpec.getWindowName());
-    } else {
-      String [] partitionKeyReferenceNames = null;
-      if (windowSpec.hasPartitionBy()) {
-        partitionKeyReferenceNames = new String [windowSpec.getPartitionKeys().length];
-        for (int i = 0; i < windowSpec.getPartitionKeys().length; i++) {
-          key = windowSpec.getPartitionKeys()[i];
-          visit(ctx, stack, key);
-          partitionKeyReferenceNames[i] = ctx.block.namedExprsMgr.addExpr(key);
-        }
-      }
-
-      String [] orderKeyReferenceNames = null;
-      if (windowSpec.hasOrderBy()) {
-        orderKeyReferenceNames = new String[windowSpec.getSortSpecs().length];
-        for (int i = 0; i < windowSpec.getSortSpecs().length; i++) {
-          key = windowSpec.getSortSpecs()[i].getKey();
-          visit(ctx, stack, key);
-          String referenceName = ctx.block.namedExprsMgr.addExpr(key);
-          if (OpType.isAggregationFunction(key.getType())) {
-            ctx.aggExprs.add(new NamedExpr(key, referenceName));
-            windowSpec.getSortSpecs()[i].setKey(new ColumnReferenceExpr(referenceName));
-          }
-          orderKeyReferenceNames[i] = referenceName;
-        }
-      }
-      windowSpecReferences =
-          new WindowSpecReferences(partitionKeyReferenceNames,orderKeyReferenceNames);
-    }
-    ctx.windowSpecs.add(windowSpecReferences);
-
-    String funcExprRef = ctx.block.namedExprsMgr.addExpr(expr);
-    ctx.windowAggExprs.add(new NamedExpr(expr, funcExprRef));
-    stack.pop();
-
-    ctx.block.setHasWindowFunction();
-    return expr;
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-  // Literal Section
-  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public Expr visitCastExpr(ExprNormalizedResult ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
-    super.visitCastExpr(ctx, stack, expr);
-    if (OpType.isAggregationFunction(expr.getType())) {
-      String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
-      ctx.aggExprs.add(new NamedExpr(expr.getChild(), referenceName));
-      expr.setChild(new ColumnReferenceExpr(referenceName));
-    }
-    return expr;
-  }
-
-  @Override
-  public Expr visitColumnReference(ExprNormalizedResult ctx, Stack<Expr> stack, ColumnReferenceExpr expr)
-      throws PlanningException {
-
-    if (ctx.block.isAliasedName(expr.getCanonicalName())) {
-      String originalName = ctx.block.getOriginalName(expr.getCanonicalName());
-      expr.setName(originalName);
-      return expr;
-    }
-    // if a column reference is not qualified, it finds and sets the qualified column name.
-    if (!(expr.hasQualifier() && CatalogUtil.isFQTableName(expr.getQualifier()))) {
-      if (!ctx.block.namedExprsMgr.contains(expr.getCanonicalName()) && expr.getType() == OpType.Column) {
-        try {
-          String normalized =
-              NameResolver.resolve(ctx.plan, ctx.block, expr, NameResolvingMode.LEGACY).getQualifiedName();
-          expr.setName(normalized);
-        } catch (NoSuchColumnException nsc) {
-        }
-      }
-    }
-    return expr;
-  }
-
-  public static class WindowSpecReferences {
-    String windowName;
-
-    String [] partitionKeys;
-    String [] orderKeys;
-
-    public WindowSpecReferences(String windowName) {
-      this.windowName = windowName;
-    }
-
-    public WindowSpecReferences(String [] partitionKeys, String [] orderKeys) {
-      this.partitionKeys = partitionKeys;
-      this.orderKeys = orderKeys;
-    }
-
-    public String getWindowName() {
-      return windowName;
-    }
-
-    public boolean hasPartitionKeys() {
-      return partitionKeys != null;
-    }
-
-    public String [] getPartitionKeys() {
-      return partitionKeys;
-    }
-
-    public boolean hasOrderBy() {
-      return orderKeys != null;
-    }
-
-    public String [] getOrderKeys() {
-      return orderKeys;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
deleted file mode 100644
index 5cb1fd9..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-
-import java.util.Set;
-import java.util.Stack;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-import static org.apache.tajo.common.TajoDataTypes.Type;
-
-/**
- * It verifies one predicate or expression with the semantic and data type checks as follows:
- * <ul>
- *   <ul>Both expressions in a binary expression are compatible to each other</ul>
- *   <ul>All column references of one expression are avilable at this node</ul>
- * </ul>
- */
-public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalNode> {
-  private static final ExprsVerifier instance;
-
-  static {
-    instance = new ExprsVerifier();
-  }
-
-  public static VerificationState verify(VerificationState state, LogicalNode currentNode, EvalNode expression)
-      throws PlanningException {
-    instance.visitChild(state, expression, new Stack<EvalNode>());
-    Set<Column> referredColumns = EvalTreeUtil.findUniqueColumns(expression);
-    for (Column referredColumn : referredColumns) {
-      if (!currentNode.getInSchema().contains(referredColumn)) {
-        throw new PlanningException("Invalid State: " + referredColumn + " cannot be accessible at Node ("
-            + currentNode.getPID() + ")");
-      }
-    }
-    return state;
-  }
-
-  /**
-   * It checks the compatibility of two data types.
-   */
-  private static boolean isCompatibleType(DataType dataType1, DataType dataType2) {
-    if (checkNumericType(dataType1) && checkNumericType(dataType2)) {
-      return true;
-    }
-
-    if (checkTextData(dataType1) && checkTextData(dataType2)) {
-      return true;
-    }
-
-    if (checkDateTime(dataType1) && checkDateTime(dataType2)) {
-      return true;
-    }
-
-    if (checkNetworkType(dataType1) && checkNetworkType(dataType2)) {
-      return true;
-    }
-
-    return false;
-  }
-
-  /**
-   * It checks both expressions in a comparison operator are compatible to each other.
-   */
-  private static void verifyComparisonOperator(VerificationState state, BinaryEval expr) {
-    DataType leftType = expr.getLeftExpr().getValueType();
-    DataType rightType = expr.getRightExpr().getValueType();
-    if (!isCompatibleType(leftType, rightType)) {
-      state.addVerification("No operator matches the given name and argument type(s): " + expr.toString());
-    }
-  }
-
-  public EvalNode visitEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
-    super.visitEqual(context, expr, stack);
-    verifyComparisonOperator(context, expr);
-    return expr;
-  }
-
-  public EvalNode visitNotEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
-    super.visitNotEqual(context, expr, stack);
-    verifyComparisonOperator(context, expr);
-    return expr;
-  }
-
-  @Override
-  public EvalNode visitLessThan(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
-    super.visitLessThan(context, expr, stack);
-    verifyComparisonOperator(context, expr);
-    return expr;
-  }
-
-  @Override
-  public EvalNode visitLessThanOrEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
-    super.visitLessThanOrEqual(context, expr, stack);
-    verifyComparisonOperator(context, expr);
-    return expr;
-  }
-
-  @Override
-  public EvalNode visitGreaterThan(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
-    super.visitGreaterThan(context, expr, stack);
-    verifyComparisonOperator(context, expr);
-    return expr;
-  }
-
-  @Override
-  public EvalNode visitGreaterThanOrEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
-    super.visitGreaterThanOrEqual(context, expr, stack);
-    verifyComparisonOperator(context, expr);
-    return expr;
-  }
-
-  private static void checkDivisionByZero(VerificationState state, BinaryEval evalNode) {
-    if (evalNode.getRightExpr().getType() == EvalType.CONST) {
-      ConstEval constEval = evalNode.getRightExpr();
-      if (constEval.getValue().asFloat8() == 0) {
-        state.addVerification("division by zero");
-      }
-    }
-  }
-
-  private static void checkArithmeticOperand(VerificationState state, BinaryEval evalNode) {
-    EvalNode leftExpr = evalNode.getLeftExpr();
-    EvalNode rightExpr = evalNode.getRightExpr();
-
-    DataType leftDataType = leftExpr.getValueType();
-    DataType rightDataType = rightExpr.getValueType();
-
-    Type leftType = leftDataType.getType();
-    Type rightType = rightDataType.getType();
-
-    if (leftType == Type.DATE &&
-          (checkIntType(rightDataType) ||
-              rightType == Type.DATE || rightType == Type.INTERVAL || rightType == Type.TIME)) {
-      return;
-    }
-
-    if (leftType == Type.INTERVAL &&
-        (checkNumericType(rightDataType) ||
-            rightType == Type.DATE || rightType == Type.INTERVAL || rightType == Type.TIME ||
-            rightType == Type.TIMESTAMP)) {
-      return;
-    }
-
-    if (leftType == Type.TIME &&
-        (rightType == Type.DATE || rightType == Type.INTERVAL || rightType == Type.TIME)) {
-      return;
-    }
-
-    if (leftType == Type.TIMESTAMP &&
-        (rightType == Type.TIMESTAMP || rightType == Type.INTERVAL || rightType == Type.TIME)) {
-      return;
-    }
-
-    if (!(checkNumericType(leftDataType) && checkNumericType(rightDataType))) {
-      state.addVerification("No operator matches the given name and argument type(s): " + evalNode.toString());
-    }
-  }
-
-  private static boolean checkNetworkType(DataType dataType) {
-    return dataType.getType() == Type.INET4 || dataType.getType() == Type.INET6;
-  }
-
-  private static boolean checkIntType(DataType dataType) {
-    int typeNumber = dataType.getType().getNumber();
-    return Type.INT1.getNumber() < typeNumber && typeNumber <= Type.INT8.getNumber();
-  }
-
-  private static boolean checkNumericType(DataType dataType) {
-    int typeNumber = dataType.getType().getNumber();
-    return Type.INT1.getNumber() <= typeNumber && typeNumber <= Type.NUMERIC.getNumber();
-  }
-
-  private static boolean checkTextData(DataType dataType) {
-    int typeNumber = dataType.getType().getNumber();
-    return Type.CHAR.getNumber() <= typeNumber && typeNumber <= Type.TEXT.getNumber();
-  }
-
-  private static boolean checkDateTime(DataType dataType) {
-    int typeNumber = dataType.getType().getNumber();
-    return (Type.DATE.getNumber() <= typeNumber && typeNumber <= Type.INTERVAL.getNumber()) ||
-        (Type.TIMEZ.getNumber() <= typeNumber && typeNumber <= Type.TIMESTAMPZ.getNumber());
-  }
-
-  @Override
-  public EvalNode visitPlus(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
-    super.visitPlus(context, evalNode, stack);
-    checkArithmeticOperand(context, evalNode);
-    return evalNode;
-  }
-
-  @Override
-  public EvalNode visitMinus(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
-    super.visitMinus(context, evalNode, stack);
-    checkArithmeticOperand(context, evalNode);
-    return evalNode;
-  }
-
-  @Override
-  public EvalNode visitMultiply(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
-    super.visitMultiply(context, evalNode, stack);
-    checkArithmeticOperand(context, evalNode);
-    return evalNode;
-  }
-
-  @Override
-  public EvalNode visitDivide(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
-    super.visitDivide(context, evalNode, stack);
-    checkArithmeticOperand(context, evalNode);
-    checkDivisionByZero(context, evalNode);
-    return evalNode;
-  }
-
-  @Override
-  public EvalNode visitModular(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
-    super.visitDivide(context, evalNode, stack);
-    checkArithmeticOperand(context, evalNode);
-    checkDivisionByZero(context, evalNode);
-    return evalNode;
-  }
-
-  @Override
-  public EvalNode visitFuncCall(VerificationState context, GeneralFunctionEval evalNode, Stack<EvalNode> stack) {
-    super.visitFuncCall(context, evalNode, stack);
-    if (evalNode.getArgs() != null) {
-      for (EvalNode param : evalNode.getArgs()) {
-        visitChild(context, param, stack);
-      }
-    }
-    return evalNode;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
deleted file mode 100644
index 3fb05c2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.algebra.Aggregation;
-import org.apache.tajo.engine.eval.EvalNode;
-
-public class GroupElement implements Cloneable {
-  @Expose private Aggregation.GroupType type;
-  @Expose private EvalNode [] groupingSets;
-
-  @SuppressWarnings("unused")
-  public GroupElement() {
-    // for gson
-  }
-
-  public GroupElement(Aggregation.GroupType type, EvalNode [] groupingSets) {
-    this.type = type;
-    this.groupingSets = groupingSets;
-  }
-
-  public Aggregation.GroupType getType() {
-    return this.type;
-  }
-
-  public EvalNode [] getGroupingSets() {
-    return this.groupingSets;
-  }
-
-  public String toString() {
-    Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
-        .setPrettyPrinting().create();
-    return gson.toJson(this);
-  }
-
-  public Object clone() throws CloneNotSupportedException {
-    GroupElement groups = (GroupElement) super.clone();
-    groups.type = type;
-    groups.groupingSets = new EvalNode [groupingSets.length];
-    for (int i = 0; i < groupingSets.length; i++) {
-      groups.groupingSets[i++] = (EvalNode) groupingSets[i].clone();
-    }
-    return groups;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
deleted file mode 100644
index a43cc1a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.algebra.JoinType;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.eval.AlgebraicUtil;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.logical.join.FoundJoinOrder;
-import org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrderAlgorithm;
-import org.apache.tajo.engine.planner.logical.join.JoinGraph;
-import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm;
-import org.apache.tajo.engine.planner.rewrite.*;
-import org.apache.tajo.engine.query.QueryContext;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.Stack;
-
-import static org.apache.tajo.engine.planner.LogicalPlan.BlockEdge;
-import static org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrderAlgorithm.getCost;
-
-/**
- * This class optimizes a logical plan.
- */
-@InterfaceStability.Evolving
-public class LogicalOptimizer {
-  private static final Log LOG = LogFactory.getLog(LogicalOptimizer.class.getName());
-
-  private BasicQueryRewriteEngine rulesBeforeJoinOpt;
-  private BasicQueryRewriteEngine rulesAfterToJoinOpt;
-  private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
-
-  public LogicalOptimizer(TajoConf systemConf) {
-    rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
-    if (systemConf.getBoolVar(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED)) {
-      rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
-    }
-
-    rulesAfterToJoinOpt = new BasicQueryRewriteEngine();
-    rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule());
-    rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf));
-
-    // Currently, it is only used for some test cases to inject exception manually.
-    String userDefinedRewriterClass = systemConf.get("tajo.plan.rewriter.classes");
-    if (userDefinedRewriterClass != null && !userDefinedRewriterClass.isEmpty()) {
-      for (String eachRewriterClass : userDefinedRewriterClass.split(",")) {
-        try {
-          RewriteRule rule = (RewriteRule) Class.forName(eachRewriterClass).newInstance();
-          rulesAfterToJoinOpt.addRewriteRule(rule);
-        } catch (Exception e) {
-          LOG.error("Can't initiate a Rewriter object: " + eachRewriterClass, e);
-          continue;
-        }
-      }
-    }
-  }
-
-  public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
-    return optimize(null, plan);
-  }
-
-  public LogicalNode optimize(QueryContext context, LogicalPlan plan) throws PlanningException {
-    rulesBeforeJoinOpt.rewrite(plan);
-
-    DirectedGraphCursor<String, BlockEdge> blockCursor =
-        new DirectedGraphCursor<String, BlockEdge>(plan.getQueryBlockGraph(), plan.getRootBlock().getName());
-
-    if (context == null || context.getBool(SessionVars.TEST_JOIN_OPT_ENABLED)) {
-      // default is true
-      while (blockCursor.hasNext()) {
-        optimizeJoinOrder(plan, blockCursor.nextBlock());
-      }
-    } else {
-      LOG.info("Skip Join Optimized.");
-    }
-    rulesAfterToJoinOpt.rewrite(plan);
-    return plan.getRootBlock().getRoot();
-  }
-
-  private void optimizeJoinOrder(LogicalPlan plan, String blockName) throws PlanningException {
-    LogicalPlan.QueryBlock block = plan.getBlock(blockName);
-
-    if (block.hasNode(NodeType.JOIN)) {
-      String originalOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
-      double nonOptimizedJoinCost = JoinCostComputer.computeCost(plan, block);
-
-      // finding relations and filter expressions
-      JoinGraphContext joinGraphContext = JoinGraphBuilder.buildJoinGraph(plan, block);
-
-      // finding join order and restore remain filter order
-      FoundJoinOrder order = joinOrderAlgorithm.findBestOrder(plan, block,
-          joinGraphContext.joinGraph, joinGraphContext.relationsForProduct);
-
-      // replace join node with FoundJoinOrder.
-      JoinNode newJoinNode = order.getOrderedJoin();
-      JoinNode old = PlannerUtil.findTopNode(block.getRoot(), NodeType.JOIN);
-
-      JoinTargetCollector collector = new JoinTargetCollector();
-      Set<Target> targets = new LinkedHashSet<Target>();
-      collector.visitJoin(targets, plan, block, old, new Stack<LogicalNode>());
-
-      if (targets.size() == 0) {
-        newJoinNode.setTargets(PlannerUtil.schemaToTargets(old.getOutSchema()));
-      } else {
-        newJoinNode.setTargets(targets.toArray(new Target[targets.size()]));
-      }
-      PlannerUtil.replaceNode(plan, block.getRoot(), old, newJoinNode);
-      // End of replacement logic
-
-      String optimizedOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
-      block.addPlanHistory("Non-optimized join order: " + originalOrder + " (cost: " + nonOptimizedJoinCost + ")");
-      block.addPlanHistory("Optimized join order    : " + optimizedOrder + " (cost: " + order.getCost() + ")");
-    }
-  }
-
-  private static class JoinTargetCollector extends BasicLogicalPlanVisitor<Set<Target>, LogicalNode> {
-    @Override
-    public LogicalNode visitJoin(Set<Target> ctx, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
-                                 Stack<LogicalNode> stack)
-        throws PlanningException {
-      super.visitJoin(ctx, plan, block, node, stack);
-
-      if (node.hasTargets()) {
-        for (Target target : node.getTargets()) {
-          ctx.add(target);
-        }
-      }
-      return node;
-    }
-  }
-
-  private static class JoinGraphContext {
-    JoinGraph joinGraph = new JoinGraph();
-    Set<EvalNode> quals = Sets.newHashSet();
-    Set<String> relationsForProduct = Sets.newHashSet();
-  }
-
-  private static class JoinGraphBuilder extends BasicLogicalPlanVisitor<JoinGraphContext, LogicalNode> {
-    private final static JoinGraphBuilder instance;
-
-    static {
-      instance = new JoinGraphBuilder();
-    }
-
-    /**
-     * This is based on the assumtion that all join and filter conditions are placed on the right join and
-     * scan operators. In other words, filter push down must be performed before this method.
-     * Otherwise, this method may build incorrectly a join graph.
-     */
-    public static JoinGraphContext buildJoinGraph(LogicalPlan plan, LogicalPlan.QueryBlock block)
-        throws PlanningException {
-      JoinGraphContext joinGraphContext = new JoinGraphContext();
-      instance.visit(joinGraphContext, plan, block);
-      return joinGraphContext;
-    }
-
-    public LogicalNode visitFilter(JoinGraphContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                   SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
-      super.visitFilter(context, plan, block, node, stack);
-      context.quals.addAll(Lists.newArrayList(AlgebraicUtil.toConjunctiveNormalFormArray(node.getQual())));
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitJoin(JoinGraphContext joinGraphContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                 JoinNode joinNode, Stack<LogicalNode> stack)
-        throws PlanningException {
-      super.visitJoin(joinGraphContext, plan, block, joinNode, stack);
-      if (joinNode.hasJoinQual()) {
-        joinGraphContext.joinGraph.addJoin(plan, block, joinNode);
-      } else {
-        LogicalNode leftChild = joinNode.getLeftChild();
-        LogicalNode rightChild = joinNode.getRightChild();
-        if (leftChild instanceof RelationNode) {
-          RelationNode rel = (RelationNode) leftChild;
-          joinGraphContext.relationsForProduct.add(rel.getCanonicalName());
-        }
-        if (rightChild instanceof RelationNode) {
-          RelationNode rel = (RelationNode) rightChild;
-          joinGraphContext.relationsForProduct.add(rel.getCanonicalName());
-        }
-      }
-      return joinNode;
-    }
-  }
-
-  public static class JoinOrderStringBuilder extends BasicLogicalPlanVisitor<StringBuilder, LogicalNode> {
-    private static final JoinOrderStringBuilder instance;
-    static {
-      instance = new JoinOrderStringBuilder();
-    }
-
-    public static JoinOrderStringBuilder getInstance() {
-      return instance;
-    }
-
-    public static String buildJoinOrderString(LogicalPlan plan, LogicalPlan.QueryBlock block) throws PlanningException {
-      StringBuilder originalOrder = new StringBuilder();
-      instance.visit(originalOrder, plan, block);
-      return originalOrder.toString();
-    }
-
-    @Override
-    public LogicalNode visitJoin(StringBuilder sb, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
-                                 Stack<LogicalNode> stack)
-        throws PlanningException {
-      stack.push(joinNode);
-      sb.append("(");
-      visit(sb, plan, block, joinNode.getLeftChild(), stack);
-      sb.append(" ").append(getJoinNotation(joinNode.getJoinType())).append(" ");
-      visit(sb, plan, block, joinNode.getRightChild(), stack);
-      sb.append(")");
-      stack.pop();
-      return joinNode;
-    }
-
-    private static String getJoinNotation(JoinType joinType) {
-      switch (joinType) {
-      case CROSS: return "⋈";
-      case INNER: return "⋈θ";
-      case LEFT_OUTER: return "⟕";
-      case RIGHT_OUTER: return "⟖";
-      case FULL_OUTER: return "⟗";
-      case LEFT_SEMI: return "⋉";
-      case RIGHT_SEMI: return "⋊";
-      case LEFT_ANTI: return "▷";
-      }
-      return ",";
-    }
-
-    @Override
-    public LogicalNode visitTableSubQuery(StringBuilder sb, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                          TableSubQueryNode node, Stack<LogicalNode> stack) {
-      sb.append(node.getTableName());
-      return node;
-    }
-
-    public LogicalNode visitScan(StringBuilder sb, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
-                                 Stack<LogicalNode> stack) {
-      sb.append(node.getTableName());
-      return node;
-    }
-  }
-
-  private static class CostContext {
-    double accumulatedCost = 0;
-  }
-
-  public static class JoinCostComputer extends BasicLogicalPlanVisitor<CostContext, LogicalNode> {
-    private static final JoinCostComputer instance;
-
-    static {
-      instance = new JoinCostComputer();
-    }
-
-    public static double computeCost(LogicalPlan plan, LogicalPlan.QueryBlock block) throws PlanningException {
-      CostContext costContext = new CostContext();
-      instance.visit(costContext, plan, block);
-      return costContext.accumulatedCost;
-    }
-
-    @Override
-    public LogicalNode visitJoin(CostContext joinGraphContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                 JoinNode joinNode, Stack<LogicalNode> stack)
-        throws PlanningException {
-      super.visitJoin(joinGraphContext, plan, block, joinNode, stack);
-
-      double filterFactor = 1;
-      if (joinNode.hasJoinQual()) {
-        EvalNode [] quals = AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual());
-        filterFactor = Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, quals.length);
-      }
-
-      if (joinNode.getLeftChild() instanceof RelationNode) {
-        joinGraphContext.accumulatedCost = getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild())
-            * filterFactor;
-      } else {
-        joinGraphContext.accumulatedCost = joinGraphContext.accumulatedCost +
-            (joinGraphContext.accumulatedCost * getCost(joinNode.getRightChild()) * filterFactor);
-      }
-
-      return joinNode;
-    }
-  }
-}
\ No newline at end of file