You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:39 UTC

[36/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
new file mode 100644
index 0000000..1ee0878
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.ObjectArrays;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.Stack;
+
+public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVerifier.Context, Expr> {
+  private CatalogService catalog;
+
+  public PreLogicalPlanVerifier(CatalogService catalog) {
+    this.catalog = catalog;
+  }
+
+  public static class Context {
+    Session session;
+    VerificationState state;
+
+    public Context(Session session, VerificationState state) {
+      this.session = session;
+      this.state = state;
+    }
+  }
+
+  public VerificationState verify(Session session, VerificationState state, Expr expr) throws PlanningException {
+    Context context = new Context(session, state);
+    visit(context, new Stack<Expr>(), expr);
+    return context.state;
+  }
+
+  public Expr visitProjection(Context context, Stack<Expr> stack, Projection expr) throws PlanningException {
+    super.visitProjection(context, stack, expr);
+
+    Set<String> names = TUtil.newHashSet();
+    Expr [] distinctValues = null;
+
+    for (NamedExpr namedExpr : expr.getNamedExprs()) {
+
+      if (namedExpr.hasAlias()) {
+        if (names.contains(namedExpr.getAlias())) {
+          context.state.addVerification(String.format("column name \"%s\" specified more than once",
+              namedExpr.getAlias()));
+        } else {
+          names.add(namedExpr.getAlias());
+        }
+      }
+
+      // no two aggregations can have different DISTINCT columns.
+      //
+      // For example, the following query will work
+      // SELECT count(DISTINCT col1) and sum(DISTINCT col1) ..
+      //
+      // But, the following query will not work in this time
+      //
+      // SELECT count(DISTINCT col1) and SUM(DISTINCT col2) ..
+      Set<GeneralSetFunctionExpr> exprs = ExprFinder.finds(namedExpr.getExpr(), OpType.GeneralSetFunction);
+      if (exprs.size() > 0) {
+        for (GeneralSetFunctionExpr setFunction : exprs) {
+          if (distinctValues == null && setFunction.isDistinct()) {
+            distinctValues = setFunction.getParams();
+          } else if (distinctValues != null && setFunction.isDistinct()) {
+            if (!Arrays.equals(distinctValues, setFunction.getParams())) {
+              Expr [] differences = ObjectArrays.concat(distinctValues, setFunction.getParams(), Expr.class);
+              throw new PlanningException("different DISTINCT columns are not supported yet: "
+                  + TUtil.arrayToString(differences));
+            }
+          }
+        }
+      }
+
+      // Currently, avg functions with distinct aggregation are not supported.
+      // This code does not allow users to use avg functions with distinct aggregation.
+      if (distinctValues != null) {
+        for (GeneralSetFunctionExpr setFunction : exprs) {
+          if (setFunction.getSignature().equalsIgnoreCase("avg")) {
+            if (setFunction.isDistinct()) {
+              throw new PlanningException("avg(distinct) function is not supported yet.");
+            } else {
+              throw new PlanningException("avg() function with distinct aggregation functions is not supported yet.");
+            }
+          }
+        }
+      }
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitGroupBy(Context context, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+    super.visitGroupBy(context, stack, expr);
+
+    // Enforcer only ordinary grouping set.
+    for (Aggregation.GroupElement groupingElement : expr.getGroupSet()) {
+      if (groupingElement.getType() != Aggregation.GroupType.OrdinaryGroup) {
+        context.state.addVerification(groupingElement.getType() + " is not supported yet");
+      }
+    }
+
+    Projection projection = null;
+    for (Expr parent : stack) {
+      if (parent.getType() == OpType.Projection) {
+        projection = (Projection) parent;
+        break;
+      }
+    }
+
+    if (projection == null) {
+      throw new PlanningException("No Projection");
+    }
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitRelation(Context context, Stack<Expr> stack, Relation expr) throws PlanningException {
+    assertRelationExistence(context, expr.getName());
+    return expr;
+  }
+
+  private boolean assertRelationExistence(Context context, String tableName) {
+    String qualifiedName;
+
+    if (CatalogUtil.isFQTableName(tableName)) {
+      qualifiedName = tableName;
+    } else {
+      qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+    }
+
+    if (!catalog.existsTable(qualifiedName)) {
+      context.state.addVerification(String.format("relation \"%s\" does not exist", qualifiedName));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertRelationNoExistence(Context context, String tableName) {
+    String qualifiedName;
+
+    if (CatalogUtil.isFQTableName(tableName)) {
+      qualifiedName = tableName;
+    } else {
+      qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+    }
+    if (catalog.existsTable(qualifiedName)) {
+      context.state.addVerification(String.format("relation \"%s\" already exists", qualifiedName));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertUnsupportedStoreType(VerificationState state, String name) {
+    if (name != null && name.equals(CatalogProtos.StoreType.RAW.name())) {
+      state.addVerification(String.format("Unsupported store type :%s", name));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertDatabaseExistence(VerificationState state, String name) {
+    if (!catalog.existDatabase(name)) {
+      state.addVerification(String.format("database \"%s\" does not exist", name));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertDatabaseNoExistence(VerificationState state, String name) {
+    if (catalog.existDatabase(name)) {
+      state.addVerification(String.format("database \"%s\" already exists", name));
+      return false;
+    }
+    return true;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Definition Language Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+
+  @Override
+  public Expr visitCreateDatabase(Context context, Stack<Expr> stack, CreateDatabase expr)
+      throws PlanningException {
+    super.visitCreateDatabase(context, stack, expr);
+    if (!expr.isIfNotExists()) {
+      assertDatabaseNoExistence(context.state, expr.getDatabaseName());
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitDropDatabase(Context context, Stack<Expr> stack, DropDatabase expr) throws PlanningException {
+    super.visitDropDatabase(context, stack, expr);
+    if (!expr.isIfExists()) {
+      assertDatabaseExistence(context.state, expr.getDatabaseName());
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitCreateTable(Context context, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+    super.visitCreateTable(context, stack, expr);
+    if (!expr.isIfNotExists()) {
+      assertRelationNoExistence(context, expr.getTableName());
+    }
+    assertUnsupportedStoreType(context.state, expr.getStorageType());
+    return expr;
+  }
+
+  @Override
+  public Expr visitDropTable(Context context, Stack<Expr> stack, DropTable expr) throws PlanningException {
+    super.visitDropTable(context, stack, expr);
+    if (!expr.isIfExists()) {
+      assertRelationExistence(context, expr.getTableName());
+    }
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Insert or Update Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public Expr visitInsert(Context context, Stack<Expr> stack, Insert expr) throws PlanningException {
+    Expr child = super.visitInsert(context, stack, expr);
+
+    if (!expr.isOverwrite()) {
+      context.state.addVerification("INSERT INTO statement is not supported yet.");
+    }
+
+    if (expr.hasTableName()) {
+      assertRelationExistence(context, expr.getTableName());
+    }
+
+    if (child != null && child.getType() == OpType.Projection) {
+      if (expr.hasTargetColumns()) {
+        Projection projection = (Projection) child;
+        int projectColumnNum = projection.getNamedExprs().length;
+        int targetColumnNum = expr.getTargetColumns().length;
+
+        if (targetColumnNum > projectColumnNum)  {
+          context.state.addVerification("INSERT has more target columns than expressions");
+        } else if (targetColumnNum < projectColumnNum) {
+          context.state.addVerification("INSERT has more expressions than target columns");
+        }
+      }
+    }
+
+    return expr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
new file mode 100644
index 0000000..161d39b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.storage.Tuple;
+
+public class Projector {
+  private final Schema inSchema;
+
+  // for projection
+  private final int targetNum;
+  private final EvalNode[] evals;
+
+  public Projector(Schema inSchema, Schema outSchema, Target [] targets) {
+    this.inSchema = inSchema;
+    if (targets == null) {
+      targets = PlannerUtil.schemaToTargets(outSchema);
+    }
+    this.targetNum = targets.length;
+    evals = new EvalNode[targetNum];
+    for (int i = 0; i < targetNum; i++) {
+      evals[i] = targets[i].getEvalTree();
+    }
+  }
+
+  public void eval(Tuple in, Tuple out) {
+    if (targetNum > 0) {
+      for (int i = 0; i < evals.length; i++) {
+        out.put(i, evals[i].eval(inSchema, in));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
new file mode 100644
index 0000000..a3522c7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+
+import java.math.BigDecimal;
+
+public abstract class RangePartitionAlgorithm {
+  protected SortSpec [] sortSpecs;
+  protected TupleRange range;
+  protected final BigDecimal totalCard;
+  /** true if the end of the range is inclusive. Otherwise, it should be false. */
+  protected final boolean inclusive;
+
+  /**
+   *
+   * @param sortSpecs The array of sort keys
+   * @param totalRange The total range to be partition
+   * @param inclusive true if the end of the range is inclusive. Otherwise, false.
+   */
+  public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) {
+    this.sortSpecs = sortSpecs;
+    this.range = totalRange;
+    this.inclusive = inclusive;
+    this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive);
+  }
+
+  /**
+   * It computes the value cardinality of a tuple range.
+   *
+   * @param dataType
+   * @param start
+   * @param end
+   * @return
+   */
+  public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
+                                              boolean inclusive, boolean isAscending) {
+    BigDecimal columnCard;
+
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        columnCard = new BigDecimal(2);
+        break;
+      case CHAR:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asChar() - start.asChar());
+        } else {
+          columnCard = new BigDecimal(start.asChar() - end.asChar());
+        }
+        break;
+      case BIT:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asByte() - start.asByte());
+        } else {
+          columnCard = new BigDecimal(start.asByte() - end.asByte());
+        }
+        break;
+      case INT2:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+        } else {
+          columnCard = new BigDecimal(start.asInt2() - end.asInt2());
+        }
+        break;
+      case INT4:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
+        break;
+      case INT8:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
+        break;
+      case FLOAT4:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
+        break;
+      case FLOAT8:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
+        break;
+      case TEXT:
+        final char textStart =  (start instanceof NullDatum || start.size() == 0) ? '0' : start.asChars().charAt(0);
+        final char textEnd = (end instanceof NullDatum || start.size() == 0) ? '0' : end.asChars().charAt(0);
+        if (isAscending) {
+          columnCard = new BigDecimal(textEnd - textStart);
+        } else {
+          columnCard = new BigDecimal(textStart - textEnd);
+        }
+        break;
+      case DATE:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
+        break;
+      case TIME:
+      case TIMESTAMP:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        } else {
+          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+        }
+        break;
+      case INET4:
+        if (isAscending) {
+          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        } else {
+          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(dataType + " is not supported yet");
+    }
+
+    return inclusive ? columnCard.add(new BigDecimal(1)).abs() : columnCard.abs();
+  }
+
+  /**
+   * It computes the value cardinality of a tuple range.
+   * @return
+   */
+  public static BigDecimal computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
+    Tuple start = range.getStart();
+    Tuple end = range.getEnd();
+    Column col;
+
+    BigDecimal cardinality = new BigDecimal(1);
+    BigDecimal columnCard;
+    for (int i = 0; i < sortSpecs.length; i++) {
+      columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive,
+          sortSpecs[i].isAscending());
+
+      if (new BigDecimal(0).compareTo(columnCard) < 0) {
+        cardinality = cardinality.multiply(columnCard);
+      }
+    }
+
+    return cardinality;
+  }
+
+  public BigDecimal getTotalCardinality() {
+    return totalCard;
+  }
+
+  /**
+   *
+   * @param partNum the number of desired partitions, but it may return the less partitions.
+   * @return the end of intermediate ranges are exclusive, and the end of final range is inclusive.
+   */
+  public abstract TupleRange[] partition(int partNum);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java
new file mode 100644
index 0000000..bae6e4a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/SimpleAlgebraVisitor.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.*;
+
+import java.util.Stack;
+
+/**
+ * <code>SimpleAlgebraVisitor</code> provides a simple and fewer visit methods. It makes building concrete class easier.
+ */
+public abstract class SimpleAlgebraVisitor<CONTEXT, RESULT> extends BaseAlgebraVisitor<CONTEXT, RESULT> {
+
+  public RESULT visit(CONTEXT ctx, Stack<Expr> stack, Expr expr) throws PlanningException {
+    RESULT result = null;
+    if (expr instanceof UnaryOperator) {
+      preHook(ctx, stack, expr);
+      result = visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
+      postHook(ctx, stack, expr, result);
+    } else if (expr instanceof BinaryOperator) {
+      preHook(ctx, stack, expr);
+      result = visitBinaryOperator(ctx, stack, (BinaryOperator) expr);
+      postHook(ctx, stack, expr, result);
+    } else {
+      result = super.visit(ctx, stack, expr);
+    }
+
+    return result;
+  }
+
+  public RESULT visitUnaryOperator(CONTEXT ctx, Stack<Expr> stack, UnaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    RESULT result = visit(ctx, stack, expr.getChild());
+    stack.pop();
+    return result;
+  }
+
+  public RESULT visitBinaryOperator(CONTEXT ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    stack.push(expr);
+    visit(ctx, stack, expr.getLeft());
+    RESULT result = visit(ctx, stack, expr.getRight());
+    stack.pop();
+    return result;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Relational Operator Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public RESULT visitProjection(CONTEXT ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
+    return super.visitProjection(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitLimit(CONTEXT ctx, Stack<Expr> stack, Limit expr) throws PlanningException {
+    return super.visitLimit(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitSort(CONTEXT ctx, Stack<Expr> stack, Sort expr) throws PlanningException {
+    return super.visitSort(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitHaving(CONTEXT ctx, Stack<Expr> stack, Having expr) throws PlanningException {
+    return super.visitHaving(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitGroupBy(CONTEXT ctx, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+    return super.visitGroupBy(ctx, stack, expr);
+  }
+
+  public RESULT visitFilter(CONTEXT ctx, Stack<Expr> stack, Selection expr) throws PlanningException {
+    return super.visitFilter(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitJoin(CONTEXT ctx, Stack<Expr> stack, Join expr) throws PlanningException {
+    return super.visitJoin(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitTableSubQuery(CONTEXT ctx, Stack<Expr> stack, TablePrimarySubQuery expr) throws PlanningException {
+    return super.visitTableSubQuery(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitRelationList(CONTEXT ctx, Stack<Expr> stack, RelationList expr) throws PlanningException {
+    return super.visitRelationList(ctx, stack, expr);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Definition Language Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public RESULT visitCreateTable(CONTEXT ctx, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+    return super.visitCreateTable(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitDropTable(CONTEXT ctx, Stack<Expr> stack, DropTable expr) throws PlanningException {
+    return super.visitDropTable(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitAlterTable(CONTEXT ctx, Stack<Expr> stack, AlterTable expr) throws PlanningException {
+    return super.visitAlterTable(ctx, stack, expr);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Insert or Update Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  @Override
+  public RESULT visitInsert(CONTEXT ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
+    return super.visitInsert(ctx, stack, expr);
+  }
+
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Other Predicates Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public RESULT visitBetween(CONTEXT ctx, Stack<Expr> stack, BetweenPredicate expr) throws PlanningException {
+    return super.visitBetween(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitCaseWhen(CONTEXT ctx, Stack<Expr> stack, CaseWhenPredicate expr) throws PlanningException {
+    return super.visitCaseWhen(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitValueListExpr(CONTEXT ctx, Stack<Expr> stack, ValueListExpr expr) throws PlanningException {
+    return super.visitValueListExpr(ctx, stack, expr);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Other Expressions
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public RESULT visitFunction(CONTEXT ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
+    return super.visitFunction(ctx, stack, expr);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // General Set Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public RESULT visitCountRowsFunction(CONTEXT ctx, Stack<Expr> stack, CountRowsFunctionExpr expr)
+      throws PlanningException {
+    return super.visitCountRowsFunction(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitGeneralSetFunction(CONTEXT ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
+      throws PlanningException {
+    return super.visitGeneralSetFunction(ctx, stack, expr);
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Literal Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public RESULT visitDataType(CONTEXT ctx, Stack<Expr> stack, DataTypeExpr expr) throws PlanningException {
+    return super.visitDataType(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitCastExpr(CONTEXT ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
+    return super.visitCastExpr(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitLiteral(CONTEXT ctx, Stack<Expr> stack, LiteralValue expr) throws PlanningException {
+    return super.visitLiteral(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitNullLiteral(CONTEXT ctx, Stack<Expr> stack, NullLiteral expr) throws PlanningException {
+    return super.visitNullLiteral(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitTimestampLiteral(CONTEXT ctx, Stack<Expr> stack, TimestampLiteral expr) throws PlanningException {
+    return super.visitTimestampLiteral(ctx, stack, expr);
+  }
+
+  @Override
+  public RESULT visitTimeLiteral(CONTEXT ctx, Stack<Expr> stack, TimeLiteral expr) throws PlanningException {
+    return super.visitTimeLiteral(ctx, stack, expr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java
new file mode 100644
index 0000000..6a16d3c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+/**
+ * A Target contains how to evaluate an expression and its alias name.
+ */
+public class Target implements Cloneable, GsonObject {
+  @Expose private EvalNode expr;
+  @Expose private Column column;
+  @Expose private String alias = null;
+
+  public Target(FieldEval fieldEval) {
+    this.expr = fieldEval;
+    this.column = fieldEval.getColumnRef();
+  }
+
+  public Target(final EvalNode eval, final String alias) {
+    this.expr = eval;
+    // force lower case
+    String normalized = alias;
+
+    // If an expr is a column reference and its alias is equivalent to column name, ignore a given alias.
+    if (eval instanceof FieldEval && eval.getName().equals(normalized)) {
+      column = ((FieldEval) eval).getColumnRef();
+    } else {
+      column = new Column(normalized, eval.getValueType());
+      setAlias(alias);
+    }
+  }
+
+  public String getCanonicalName() {
+    return !hasAlias() ? column.getQualifiedName() : alias;
+  }
+
+  public final void setExpr(EvalNode expr) {
+    this.expr = expr;
+  }
+
+  public final void setAlias(String alias) {
+    this.alias = alias;
+    this.column = new Column(alias, expr.getValueType());
+  }
+
+  public final String getAlias() {
+    return alias;
+  }
+
+  public final boolean hasAlias() {
+    return alias != null;
+  }
+
+  public DataType getDataType() {
+    return column.getDataType();
+  }
+
+  public <T extends EvalNode> T getEvalTree() {
+    return (T) this.expr;
+  }
+
+  public Column getNamedColumn() {
+    return this.column;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder(expr.toString());
+    if(hasAlias()) {
+      sb.append(" as ").append(alias);
+    }
+    return sb.toString();
+  }
+
+  public boolean equals(Object obj) {
+    if(obj instanceof Target) {
+      Target other = (Target) obj;
+
+      boolean b1 = expr.equals(other.expr);
+      boolean b2 = column.equals(other.column);
+      boolean b3 = TUtil.checkEquals(alias, other.alias);
+
+      return b1 && b2 && b3;
+    } else {
+      return false;
+    }
+  }
+
+  public int hashCode() {
+    return this.expr.getName().hashCode();
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    Target target = (Target) super.clone();
+    target.expr = (EvalNode) expr.clone();
+    target.column = column;
+    target.alias = alias != null ? alias : null;
+
+    return target;
+  }
+
+  public String toJson() {
+    return CoreGsonHelper.toJson(this, Target.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
new file mode 100644
index 0000000..f6922ed
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.exception.RangeOverflowException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.Bytes;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+
+
+public class UniformRangePartition extends RangePartitionAlgorithm {
+  private int variableId;
+  private BigDecimal[] cardForEachDigit;
+  private BigDecimal[] colCards;
+
+  /**
+   *
+   * @param totalRange
+   * @param sortSpecs The description of sort keys
+   * @param inclusive true if the end of the range is inclusive
+   */
+  public UniformRangePartition(TupleRange totalRange, SortSpec[] sortSpecs, boolean inclusive) {
+    super(sortSpecs, totalRange, inclusive);
+    colCards = new BigDecimal[sortSpecs.length];
+    for (int i = 0; i < sortSpecs.length; i++) {
+      colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i),
+          totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending());
+    }
+
+    cardForEachDigit = new BigDecimal[colCards.length];
+    for (int i = 0; i < colCards.length ; i++) {
+      if (i == 0) {
+        cardForEachDigit[i] = colCards[i];
+      } else {
+        cardForEachDigit[i] = cardForEachDigit[i - 1].multiply(colCards[i]);
+      }
+    }
+  }
+
+  public UniformRangePartition(TupleRange range, SortSpec [] sortSpecs) {
+    this(range, sortSpecs, true);
+  }
+
+  @Override
+  public TupleRange[] partition(int partNum) {
+    Preconditions.checkArgument(partNum > 0,
+        "The number of partitions must be positive, but the given number: "
+            + partNum);
+    Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
+        "the number of partition cannot exceed total cardinality (" + totalCard + ")");
+
+    int varId;
+    for (varId = 0; varId < cardForEachDigit.length; varId++) {
+      if (cardForEachDigit[varId].compareTo(new BigDecimal(partNum)) >= 0)
+        break;
+    }
+    this.variableId = varId;
+
+    BigDecimal [] reverseCardsForDigit = new BigDecimal[variableId+1];
+    for (int i = variableId; i >= 0; i--) {
+      if (i == variableId) {
+        reverseCardsForDigit[i] = colCards[i];
+      } else {
+        reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+      }
+    }
+
+    List<TupleRange> ranges = Lists.newArrayList();
+    BigDecimal term = reverseCardsForDigit[0].divide(
+        new BigDecimal(partNum), RoundingMode.CEILING);
+    BigDecimal reminder = reverseCardsForDigit[0];
+    Tuple last = range.getStart();
+    while(reminder.compareTo(new BigDecimal(0)) > 0) {
+      if (reminder.compareTo(term) <= 0) { // final one is inclusive
+        ranges.add(new TupleRange(sortSpecs, last, range.getEnd()));
+      } else {
+        Tuple next = increment(last, term.longValue(), variableId);
+        ranges.add(new TupleRange(sortSpecs, last, next));
+      }
+      last = ranges.get(ranges.size() - 1).getEnd();
+      reminder = reminder.subtract(term);
+    }
+
+    return ranges.toArray(new TupleRange[ranges.size()]);
+  }
+
+  /**
+  *  Check whether an overflow occurs or not.
+   *
+   * @param colId The column id to be checked
+   * @param last
+   * @param inc
+   * @param sortSpecs
+   * @return
+   */
+  public boolean isOverflow(int colId, Datum last, BigDecimal inc, SortSpec [] sortSpecs) {
+    Column column = sortSpecs[colId].getSortKey();
+    BigDecimal candidate;
+    boolean overflow = false;
+
+    switch (column.getDataType().getType()) {
+      case BIT: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asByte()));
+          return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asByte()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asByte())) < 0;
+        }
+      }
+      case CHAR: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal((int)last.asChar()));
+          return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal((int)last.asChar()).subtract(inc);
+          return candidate.compareTo(new BigDecimal((int)range.getEnd().get(colId).asChar())) < 0;
+        }
+      }
+      case INT2: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt2()));
+          return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt2()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt2())) < 0;
+        }
+      }
+      case DATE:
+      case INT4: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt4()));
+          return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt4()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt4())) < 0;
+        }
+      }
+      case TIME:
+      case TIMESTAMP:
+      case INT8: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asInt8()));
+          return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asInt8()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt8())) < 0;
+        }
+      }
+      case FLOAT4: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asFloat4()));
+          return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asFloat4()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat4())) < 0;
+        }
+      }
+      case FLOAT8: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal(last.asFloat8()));
+          return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal(last.asFloat8()).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat8())) < 0;
+        }
+
+      }
+      case TEXT: {
+        if (sortSpecs[colId].isAscending()) {
+          candidate = inc.add(new BigDecimal((int)(last instanceof NullDatum ? '0' : last.asChars().charAt(0))));
+          return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+        } else {
+          candidate = new BigDecimal((int)(last.asChars().charAt(0))).subtract(inc);
+          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asChars().charAt(0))) < 0;
+        }
+      }
+      case INET4: {
+        int candidateIntVal;
+        byte[] candidateBytesVal = new byte[4];
+        if (sortSpecs[colId].isAscending()) {
+          candidateIntVal = inc.intValue() + last.asInt4();
+          if (candidateIntVal - inc.intValue() != last.asInt4()) {
+            return true;
+          }
+          Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
+          return Bytes.compareTo(range.getEnd().get(colId).asByteArray(), candidateBytesVal) < 0;
+        } else {
+          candidateIntVal = last.asInt4() - inc.intValue();
+          if (candidateIntVal + inc.intValue() != last.asInt4()) {
+            return true;
+          }
+          Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
+          return Bytes.compareTo(candidateBytesVal, range.getEnd().get(colId).asByteArray()) < 0;
+        }
+      }
+    }
+    return overflow;
+  }
+
+  public long incrementAndGetReminder(int colId, Datum last, long inc) {
+    Column column = sortSpecs[colId].getSortKey();
+    long reminder = 0;
+    switch (column.getDataType().getType()) {
+      case BIT: {
+        long candidate = last.asByte() + inc;
+        byte end = range.getEnd().get(colId).asByte();
+        reminder = candidate - end;
+        break;
+      }
+      case CHAR: {
+        long candidate = last.asChar() + inc;
+        char end = range.getEnd().get(colId).asChar();
+        reminder = candidate - end;
+        break;
+      }
+      case DATE:
+      case INT4: {
+        int candidate = (int) (last.asInt4() + inc);
+        int end = range.getEnd().get(colId).asInt4();
+        reminder = candidate - end;
+        break;
+      }
+      case TIME:
+      case TIMESTAMP:
+      case INT8:
+      case INET4: {
+        long candidate = last.asInt8() + inc;
+        long end = range.getEnd().get(colId).asInt8();
+        reminder = candidate - end;
+        break;
+      }
+      case FLOAT4: {
+        float candidate = last.asFloat4() + inc;
+        float end = range.getEnd().get(colId).asFloat4();
+        reminder = (long) (candidate - end);
+        break;
+      }
+      case FLOAT8: {
+        double candidate = last.asFloat8() + inc;
+        double end = range.getEnd().get(colId).asFloat8();
+        reminder = (long) Math.ceil(candidate - end);
+        break;
+      }
+      case TEXT: {
+        char candidate = ((char)(last.asChars().charAt(0) + inc));
+        char end = range.getEnd().get(colId).asChars().charAt(0);
+        reminder = (char) (candidate - end);
+        break;
+      }
+    }
+
+    // including zero
+    return reminder - 1;
+  }
+
+  /**
+   *
+   * @param last
+   * @param inc
+   * @return
+   */
+  public Tuple increment(final Tuple last, final long inc, final int baseDigit) {
+    BigDecimal [] incs = new BigDecimal[last.size()];
+    boolean [] overflowFlag = new boolean[last.size()];
+    BigDecimal [] result;
+    BigDecimal value = new BigDecimal(inc);
+
+    BigDecimal [] reverseCardsForDigit = new BigDecimal[baseDigit + 1];
+    for (int i = baseDigit; i >= 0; i--) {
+      if (i == baseDigit) {
+        reverseCardsForDigit[i] = colCards[i];
+      } else {
+        reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+      }
+    }
+
+    for (int i = 0; i < baseDigit; i++) {
+      result = value.divideAndRemainder(reverseCardsForDigit[i + 1]);
+      incs[i] = result[0];
+      value = result[1];
+    }
+    int finalId = baseDigit;
+    incs[finalId] = value;
+    for (int i = finalId; i >= 0; i--) {
+      if (isOverflow(i, last.get(i), incs[i], sortSpecs)) {
+        if (i == 0) {
+          throw new RangeOverflowException(range, last, incs[i].longValue());
+        }
+        long rem = incrementAndGetReminder(i, last.get(i), value.longValue());
+        incs[i] = new BigDecimal(rem);
+        incs[i - 1] = incs[i-1].add(new BigDecimal(1));
+        overflowFlag[i] = true;
+      } else {
+        if (i > 0) {
+          incs[i] = value;
+          break;
+        }
+      }
+    }
+
+    for (int i = 0; i < incs.length; i++) {
+      if (incs[i] == null) {
+        incs[i] = new BigDecimal(0);
+      }
+    }
+
+    Tuple end = new VTuple(sortSpecs.length);
+    Column column;
+    for (int i = 0; i < last.size(); i++) {
+      column = sortSpecs[i].getSortKey();
+      switch (column.getDataType().getType()) {
+        case CHAR:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createChar((char) (range.getStart().get(i).asChar() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
+          }
+          break;
+        case BIT:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createBit(
+                (byte) (range.getStart().get(i).asByte() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
+          }
+          break;
+        case INT2:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createInt2(
+                (short) (range.getStart().get(i).asInt2() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
+          }
+          break;
+        case INT4:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createInt4(
+                (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+          } else {
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+            } else {
+              end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() - incs[i].longValue())));
+            }
+          }
+          break;
+        case INT8:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createInt8(
+                range.getStart().get(i).asInt8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
+        case FLOAT4:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createFloat4(
+                range.getStart().get(i).asFloat4() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
+          }
+          break;
+        case FLOAT8:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createFloat8(
+                range.getStart().get(i).asFloat8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
+          }
+          break;
+        case TEXT:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createText(((char) (range.getStart().get(i).asChars().charAt(0)
+                + incs[i].longValue())) + ""));
+          } else {
+            end.put(i, DatumFactory.createText(
+                ((char) ((last.get(i) instanceof NullDatum ? '0': last.get(i).asChars().charAt(0)) + incs[i].longValue())) + ""));
+          }
+          break;
+        case DATE:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createDate((int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+          }
+          break;
+        case TIME:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createTime(range.getStart().get(i).asInt8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
+        case TIMESTAMP:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createTimeStampFromMillis(
+                range.getStart().get(i).asInt8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createTimeStampFromMillis(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
+        case INET4:
+          byte[] ipBytes;
+          if (overflowFlag[i]) {
+            ipBytes = range.getStart().get(i).asByteArray();
+            assert ipBytes.length == 4;
+            end.put(i, DatumFactory.createInet4(ipBytes));
+          } else {
+            int lastVal = last.get(i).asInt4() + incs[i].intValue();
+            ipBytes = new byte[4];
+            Bytes.putInt(ipBytes, 0, lastVal);
+            end.put(i, DatumFactory.createInet4(ipBytes));
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
+      }
+    }
+
+    return end;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
new file mode 100644
index 0000000..18882b8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/VerificationState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+
+public class VerificationState {
+  private static final Log LOG = LogFactory.getLog(VerificationState.class);
+  List<String> errorMessages = Lists.newArrayList();
+
+  public void addVerification(String error) {
+    LOG.warn(TUtil.getCurrentCodePoint(1) + " causes: " + error);
+    errorMessages.add(error);
+  }
+
+  public boolean verified() {
+    return errorMessages.size() == 0;
+  }
+
+  public List<String> getErrorMessages() {
+    return errorMessages;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
new file mode 100644
index 0000000..91190f6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.enforce;
+
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+
+public class Enforcer implements ProtoObject<EnforcerProto> {
+  Map<EnforceType, List<EnforceProperty>> properties;
+  private EnforcerProto proto;
+
+  @SuppressWarnings("unused")
+  public Enforcer() {
+    properties = TUtil.newHashMap();
+  }
+
+  public Enforcer(EnforcerProto proto) {
+    this.proto = proto;
+  }
+
+  private EnforceProperty.Builder newProperty() {
+    return EnforceProperty.newBuilder();
+  }
+
+  private void initProperties() {
+    if (properties == null) {
+      properties = TUtil.newHashMap();
+      for (EnforceProperty property : proto.getPropertiesList()) {
+        TUtil.putToNestedList(properties, property.getType(), property);
+      }
+    }
+  }
+
+  public boolean hasEnforceProperty(EnforceType type) {
+    initProperties();
+    return properties.containsKey(type);
+  }
+
+  public List<EnforceProperty> getEnforceProperties(EnforceType type) {
+    initProperties();
+    return properties.get(type);
+  }
+
+  public void addSortedInput(String tableName, SortSpec[] sortSpecs) {
+    EnforceProperty.Builder builder = newProperty();
+    SortedInputEnforce.Builder enforce = SortedInputEnforce.newBuilder();
+    enforce.setTableName(tableName);
+    for (SortSpec sortSpec : sortSpecs) {
+      enforce.addSortSpecs(sortSpec.getProto());
+    }
+
+    builder.setType(EnforceType.SORTED_INPUT);
+    builder.setSortedInput(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void addOutputDistinct() {
+    EnforceProperty.Builder builder = newProperty();
+    OutputDistinctEnforce.Builder enforce = OutputDistinctEnforce.newBuilder();
+
+    builder.setType(EnforceType.OUTPUT_DISTINCT);
+    builder.setOutputDistinct(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void enforceJoinAlgorithm(int pid, JoinEnforce.JoinAlgorithm algorithm) {
+    EnforceProperty.Builder builder = newProperty();
+    JoinEnforce.Builder enforce = JoinEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(algorithm);
+
+    builder.setType(EnforceType.JOIN);
+    builder.setJoin(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void enforceSortAggregation(int pid, @Nullable SortSpec[] sortSpecs) {
+    EnforceProperty.Builder builder = newProperty();
+    GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(GroupbyAlgorithm.SORT_AGGREGATION);
+    if (sortSpecs != null) {
+      for (SortSpec sortSpec : sortSpecs) {
+        enforce.addSortSpecs(sortSpec.getProto());
+      }
+    }
+
+    builder.setType(EnforceType.GROUP_BY);
+    builder.setGroupby(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void enforceHashAggregation(int pid) {
+    EnforceProperty.Builder builder = newProperty();
+    GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(GroupbyAlgorithm.HASH_AGGREGATION);
+
+    builder.setType(EnforceType.GROUP_BY);
+    builder.setGroupby(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) {
+    EnforceProperty.Builder builder = newProperty();
+    SortEnforce.Builder enforce = SortEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(algorithm);
+
+    builder.setType(EnforceType.SORT);
+    builder.setSort(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void addBroadcast(String tableName) {
+    EnforceProperty.Builder builder = newProperty();
+    BroadcastEnforce.Builder enforce = BroadcastEnforce.newBuilder();
+    enforce.setTableName(tableName);
+
+    builder.setType(EnforceType.BROADCAST);
+    builder.setBroadcast(enforce);
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void enforceColumnPartitionAlgorithm(int pid, ColumnPartitionAlgorithm algorithm) {
+    EnforceProperty.Builder builder = newProperty();
+    ColumnPartitionEnforcer.Builder enforce = ColumnPartitionEnforcer.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(algorithm);
+
+    builder.setType(EnforceType.COLUMN_PARTITION);
+    builder.setColumnPartition(enforce);
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public Collection<EnforceProperty> getProperties() {
+    if (proto != null) {
+      return proto.getPropertiesList();
+    } else {
+      List<EnforceProperty> list = TUtil.newList();
+      for (List<EnforceProperty> propertyList : properties.values()) {
+        list.addAll(propertyList);
+      }
+      return list;
+    }
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Enforce ").append(properties.size()).append(" properties: ");
+    boolean first = true;
+    for (EnforceType enforceType : properties.keySet()) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append(enforceType);
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public EnforcerProto getProto() {
+    EnforcerProto.Builder builder = EnforcerProto.newBuilder();
+    builder.addAllProperties(getProperties());
+    return builder.build();
+  }
+
+  public static String toString(EnforceProperty property) {
+    StringBuilder sb = new StringBuilder();
+    switch (property.getType()) {
+    case GROUP_BY:
+      GroupbyEnforce groupby = property.getGroupby();
+      sb.append("type=GroupBy,alg=");
+      if (groupby.getAlgorithm() == GroupbyAlgorithm.HASH_AGGREGATION) {
+        sb.append("hash");
+      } else {
+        sb.append("sort");
+        sb.append(",keys=");
+        boolean first = true;
+        for (CatalogProtos.SortSpecProto sortSpec : groupby.getSortSpecsList()) {
+          if (first == true) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(sortSpec.getColumn().getName());
+          sb.append(" (").append(sortSpec.getAscending() ? "asc":"desc").append(")");
+        }
+      }
+      break;
+    case BROADCAST:
+      BroadcastEnforce broadcast = property.getBroadcast();
+      sb.append("type=Broadcast, tables=").append(broadcast.getTableName());
+      break;
+    case COLUMN_PARTITION:
+      ColumnPartitionEnforcer columnPartition = property.getColumnPartition();
+      sb.append("type=ColumnPartition, alg=");
+      if (columnPartition.getAlgorithm() == ColumnPartitionAlgorithm.SORT_PARTITION) {
+        sb.append("sort");
+      } else {
+        sb.append("hash");
+      }
+      break;
+    case JOIN:
+      JoinEnforce join = property.getJoin();
+      sb.append("type=Join,alg=");
+      if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.MERGE_JOIN) {
+        sb.append("merge_join");
+      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.NESTED_LOOP_JOIN) {
+        sb.append("nested_loop");
+      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN) {
+        sb.append("block_nested_loop");
+      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.IN_MEMORY_HASH_JOIN) {
+        sb.append("in_memory_hash");
+      }
+      break;
+    case OUTPUT_DISTINCT:
+    case SORT:
+      SortEnforce sort = property.getSort();
+      sb.append("type=Sort,alg=");
+      if (sort.getAlgorithm() == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) {
+        sb.append("in-memory");
+      } else {
+        sb.append("external");
+      }
+      break;
+    case SORTED_INPUT:
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
new file mode 100644
index 0000000..b3b5bb0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.util.TUtil;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class DataChannel {
+  private ExecutionBlockId srcId;
+  private ExecutionBlockId targetId;
+  private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
+  private ShuffleType shuffleType;
+  private Integer numOutputs = 1;
+  private Column[] shuffleKeys;
+
+  private Schema schema;
+
+  private StoreType storeType = StoreType.RAW;
+
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
+    this.srcId = srcId;
+    this.targetId = targetId;
+  }
+
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType) {
+    this(srcId, targetId);
+    this.shuffleType = shuffleType;
+  }
+
+  public DataChannel(ExecutionBlock src, ExecutionBlock target, ShuffleType shuffleType, int numOutput) {
+    this(src.getId(), target.getId(), shuffleType, numOutput);
+    setSchema(src.getPlan().getOutSchema());
+  }
+
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType, int numOutputs) {
+    this(srcId, targetId, shuffleType);
+    this.numOutputs = numOutputs;
+  }
+
+  public DataChannel(DataChannelProto proto) {
+    this.srcId = new ExecutionBlockId(proto.getSrcId());
+    this.targetId = new ExecutionBlockId(proto.getTargetId());
+    this.transmitType = proto.getTransmitType();
+    this.shuffleType = proto.getShuffleType();
+    if (proto.hasSchema()) {
+      this.setSchema(new Schema(proto.getSchema()));
+    }
+    if (proto.getShuffleKeysCount() > 0) {
+      shuffleKeys = new Column[proto.getShuffleKeysCount()];
+      for (int i = 0; i < proto.getShuffleKeysCount(); i++) {
+        shuffleKeys[i] = new Column(proto.getShuffleKeys(i));
+      }
+    } else {
+      shuffleKeys = new Column[] {};
+    }
+    if (proto.hasNumOutputs()) {
+      this.numOutputs = proto.getNumOutputs();
+    }
+
+    if (proto.hasStoreType()) {
+      this.storeType = proto.getStoreType();
+    }
+  }
+
+  public ExecutionBlockId getSrcId() {
+    return srcId;
+  }
+
+  public ExecutionBlockId getTargetId() {
+    return targetId;
+  }
+
+  public ShuffleType getShuffleType() {
+    return shuffleType;
+  }
+
+  public TransmitType getTransmitType() {
+    return this.transmitType;
+  }
+
+  public void setTransmitType(TransmitType transmitType) {
+    this.transmitType = transmitType;
+  }
+
+  public void setShuffle(ShuffleType shuffleType, Column[] keys, int numOutputs) {
+    Preconditions.checkArgument(keys.length >= 0, "At least one shuffle key must be specified.");
+    Preconditions.checkArgument(numOutputs > 0, "The number of outputs must be positive: %s", numOutputs);
+
+    this.shuffleType = shuffleType;
+    this.shuffleKeys = keys;
+    this.numOutputs = numOutputs;
+  }
+
+  public void setShuffleType(ShuffleType shuffleType) {
+    this.shuffleType = shuffleType;
+  }
+
+  public boolean hasShuffleKeys() {
+    return shuffleKeys != null;
+  }
+
+  public void setShuffleKeys(Column[] key) {
+    this.shuffleKeys = key;
+  }
+
+  public Column [] getShuffleKeys() {
+    return this.shuffleKeys;
+  }
+
+  public void setShuffleOutputNum(int partNum) {
+    this.numOutputs = partNum;
+  }
+
+  public int getShuffleOutputNum() {
+    return numOutputs;
+  }
+
+  public boolean hasStoreType() {
+    return this.storeType != null;
+  }
+
+  public void setStoreType(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  public StoreType getStoreType() {
+    return storeType;
+  }
+
+  public DataChannelProto getProto() {
+    DataChannelProto.Builder builder = DataChannelProto.newBuilder();
+    builder.setSrcId(srcId.getProto());
+    builder.setTargetId(targetId.getProto());
+    if (transmitType != null) {
+      builder.setTransmitType(transmitType);
+    }
+    builder.setShuffleType(shuffleType);
+    if (schema != null) {
+      builder.setSchema(schema.getProto());
+    }
+    if (shuffleKeys != null) {
+      for (Column column : shuffleKeys) {
+        builder.addShuffleKeys(column.getProto());
+      }
+    }
+    if (numOutputs != null) {
+      builder.setNumOutputs(numOutputs);
+    }
+
+    if(storeType != null){
+      builder.setStoreType(storeType);
+    }
+    return builder.build();
+  }
+
+  public void setSchema(Schema schema) {
+    this.schema = SchemaUtil.clone(schema);
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[").append(srcId.getQueryId()).append("] ");
+    sb.append(srcId.getId()).append(" => ").append(targetId.getId());
+    sb.append(" (type=").append(shuffleType);
+    if (hasShuffleKeys()) {
+      sb.append(", key=");
+      sb.append(TUtil.arrayToString(shuffleKeys));
+      sb.append(", num=").append(numOutputs);
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
new file mode 100644
index 0000000..7df6b43
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.*;
+
+import java.util.*;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
+ * An ExecutionBlock class contains input information (e.g., child execution blocks or input
+ * tables), and output information (e.g., partition type, partition key, and partition number).
+ * In addition, it includes a logical plan to be executed in each node.
+ */
+public class ExecutionBlock {
+  private ExecutionBlockId executionBlockId;
+  private LogicalNode plan = null;
+  private StoreTableNode store = null;
+  private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+  private Enforcer enforcer = new Enforcer();
+
+  private boolean hasJoinPlan;
+  private boolean hasUnionPlan;
+
+  private Set<String> broadcasted = new HashSet<String>();
+
+  public ExecutionBlock(ExecutionBlockId executionBlockId) {
+    this.executionBlockId = executionBlockId;
+  }
+
+  public ExecutionBlockId getId() {
+    return executionBlockId;
+  }
+
+  public void setPlan(LogicalNode plan) {
+    hasJoinPlan = false;
+    hasUnionPlan = false;
+    this.scanlist.clear();
+    this.plan = plan;
+
+    if (plan == null) {
+      return;
+    }
+
+    LogicalNode node = plan;
+    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+    s.add(node);
+    while (!s.isEmpty()) {
+      node = s.remove(s.size()-1);
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        s.add(s.size(), unary.getChild());
+      } else if (node instanceof BinaryNode) {
+        BinaryNode binary = (BinaryNode) node;
+        if (binary.getType() == NodeType.JOIN) {
+          hasJoinPlan = true;
+        } else if (binary.getType() == NodeType.UNION) {
+          hasUnionPlan = true;
+        }
+        s.add(s.size(), binary.getLeftChild());
+        s.add(s.size(), binary.getRightChild());
+      } else if (node instanceof ScanNode) {
+        scanlist.add((ScanNode)node);
+      } else if (node instanceof TableSubQueryNode) {
+        TableSubQueryNode subQuery = (TableSubQueryNode) node;
+        s.add(s.size(), subQuery.getSubQuery());
+      }
+    }
+  }
+
+
+  public LogicalNode getPlan() {
+    return plan;
+  }
+
+  public Enforcer getEnforcer() {
+    return enforcer;
+  }
+
+  public StoreTableNode getStoreTableNode() {
+    return store;
+  }
+
+  public ScanNode [] getScanNodes() {
+    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
+  }
+
+  public boolean hasJoin() {
+    return hasJoinPlan;
+  }
+
+  public boolean hasUnion() {
+    return hasUnionPlan;
+  }
+
+  public void addBroadcastTable(String tableName) {
+    broadcasted.add(tableName);
+    enforcer.addBroadcast(tableName);
+  }
+
+  public boolean isBroadcastTable(String tableName) {
+    return broadcasted.contains(tableName);
+  }
+
+  public Collection<String> getBroadcastTables() {
+    return broadcasted;
+  }
+
+  public String toString() {
+    return executionBlockId.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
new file mode 100644
index 0000000..d4ab068
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import java.util.ArrayList;
+import java.util.Stack;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * This class is a pointer to an ExecutionBlock that the query engine should execute.
+ * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
+ */
+public class ExecutionBlockCursor {
+  private MasterPlan masterPlan;
+  private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
+  private int cursor = 0;
+
+  public ExecutionBlockCursor(MasterPlan plan) {
+    this.masterPlan = plan;
+    buildOrder(plan.getRoot());
+  }
+
+  public int size() {
+    return orderedBlocks.size();
+  }
+
+  // Add all execution blocks in a depth first and postfix order
+  private void buildOrder(ExecutionBlock current) {
+    Stack<ExecutionBlock> stack = new Stack<ExecutionBlock>();
+    if (!masterPlan.isLeaf(current.getId())) {
+      for (ExecutionBlock execBlock : masterPlan.getChilds(current)) {
+        if (!masterPlan.isLeaf(execBlock)) {
+          buildOrder(execBlock);
+        } else {
+          stack.push(execBlock);
+        }
+      }
+      for (ExecutionBlock execBlock : stack) {
+        buildOrder(execBlock);
+      }
+    }
+    orderedBlocks.add(current);
+  }
+
+  public boolean hasNext() {
+    return cursor < orderedBlocks.size();
+  }
+
+  public ExecutionBlock nextBlock() {
+    return orderedBlocks.get(cursor++);
+  }
+
+  public ExecutionBlock peek() {
+    return orderedBlocks.get(cursor);
+  }
+
+  public ExecutionBlock peek(int skip) {
+    return  orderedBlocks.get(cursor + skip);
+  }
+
+  public void reset() {
+    cursor = 0;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < orderedBlocks.size(); i++) {
+      if (i == (cursor == 0 ? 0 : cursor - 1)) {
+        sb.append("(").append(orderedBlocks.get(i).getId().getId()).append(")");
+      } else {
+        sb.append(orderedBlocks.get(i).getId().getId());
+      }
+
+      if (i < orderedBlocks.size() - 1) {
+        sb.append(",");
+      }
+    }
+
+    return sb.toString();
+  }
+}