You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/10/26 20:27:15 UTC

[09/35] TAJO-1125: Separate logical plan and optimizer into a maven module.

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
new file mode 100644
index 0000000..51a016f
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.exception.NoSuchColumnException;
+import org.apache.tajo.plan.algebra.AmbiguousFieldException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.plan.logical.RelationNode;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * NameResolver utility
+ */
+public abstract class NameResolver {
+
+  public static Map<NameResolvingMode, NameResolver> resolverMap = Maps.newHashMap();
+
+  static {
+    resolverMap.put(NameResolvingMode.RELS_ONLY, new ResolverByRels());
+    resolverMap.put(NameResolvingMode.RELS_AND_SUBEXPRS, new ResolverByRelsAndSubExprs());
+    resolverMap.put(NameResolvingMode.SUBEXPRS_AND_RELS, new ResolverBySubExprsAndRels());
+    resolverMap.put(NameResolvingMode.LEGACY, new ResolverByLegacy());
+  }
+
+  abstract Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+  throws PlanningException;
+
+  /**
+   * Try to find the database name
+   *
+   * @param block the current block
+   * @param tableName The table name
+   * @return The found database name
+   * @throws PlanningException
+   */
+  public static String resolveDatabase(LogicalPlan.QueryBlock block, String tableName) throws PlanningException {
+    List<String> found = new ArrayList<String>();
+    for (RelationNode relation : block.getRelations()) {
+      // check alias name or table name
+      if (CatalogUtil.extractSimpleName(relation.getCanonicalName()).equals(tableName) ||
+          CatalogUtil.extractSimpleName(relation.getTableName()).equals(tableName)) {
+        // obtain the database name
+        found.add(CatalogUtil.extractQualifier(relation.getTableName()));
+      }
+    }
+
+    if (found.size() == 0) {
+      return null;
+    } else if (found.size() > 1) {
+      throw new PlanningException("Ambiguous table name \"" + tableName + "\"");
+    }
+
+    return found.get(0);
+  }
+
+  public static Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr column,
+                        NameResolvingMode mode) throws PlanningException {
+    if (!resolverMap.containsKey(mode)) {
+      throw new PlanningException("Unsupported name resolving level: " + mode.name());
+    }
+    return resolverMap.get(mode).resolve(plan, block, column);
+  }
+
+  /**
+   * Try to find a column from all relations within a given query block.
+   * If a given column reference is qualified, it tries to resolve the name
+   * from only the relation corresponding to the qualifier.
+   *
+   * @param plan The logical plan
+   * @param block The current query block
+   * @param columnRef The column reference to be found
+   * @return The found column
+   * @throws PlanningException
+   */
+  static Column resolveFromRelsWithinBlock(LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                                  ColumnReferenceExpr columnRef) throws PlanningException {
+    String qualifier;
+    String canonicalName;
+
+    if (columnRef.hasQualifier()) {
+      Pair<String, String> normalized = normalizeQualifierAndCanonicalName(block, columnRef);
+      qualifier = normalized.getFirst();
+      canonicalName = normalized.getSecond();
+
+      RelationNode relationOp = block.getRelation(qualifier);
+
+      // If we cannot find any relation against a qualified column name
+      if (relationOp == null) {
+        throw null;
+      }
+
+      // Please consider a query case:
+      // select lineitem.l_orderkey from lineitem a order by lineitem.l_orderkey;
+      //
+      // The relation lineitem is already renamed to "a", but lineitem.l_orderkey still can be used.
+      // The below code makes it available. Otherwise, it cannot find any match in the relation schema.
+      if (block.isAlreadyRenamedTableName(CatalogUtil.extractQualifier(canonicalName))) {
+        canonicalName =
+            CatalogUtil.buildFQName(relationOp.getCanonicalName(), CatalogUtil.extractSimpleName(canonicalName));
+      }
+
+      Schema schema = relationOp.getTableSchema();
+      Column column = schema.getColumn(canonicalName);
+
+      return column;
+    } else {
+      return resolveFromAllRelsInBlock(block, columnRef);
+    }
+  }
+
+  /**
+   * Try to find the column from the current node and child node. It can find subexprs generated from the optimizer.
+   *
+   * @param block The current query block
+   * @param columnRef The column reference to be found
+   * @return The found column
+   */
+  static Column resolveFromCurrentAndChildNode(LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+      throws NoSuchColumnException {
+
+    if (block.getCurrentNode() != null && block.getCurrentNode().getInSchema() != null) {
+      Column found = block.getCurrentNode().getInSchema().getColumn(columnRef.getCanonicalName());
+      if (found != null) {
+        return found;
+      } else if (block.getLatestNode() != null) {
+        found = block.getLatestNode().getOutSchema().getColumn(columnRef.getName());
+        if (found != null) {
+          return found;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * It tries to find a full qualified column name from all relations in the current block.
+   *
+   * @param block The current query block
+   * @param columnRef The column reference to be found
+   * @return The found column
+   */
+  static Column resolveFromAllRelsInBlock(LogicalPlan.QueryBlock block,
+                                          ColumnReferenceExpr columnRef) throws VerifyException {
+    List<Column> candidates = TUtil.newList();
+
+    for (RelationNode rel : block.getRelations()) {
+      Column found = rel.getTableSchema().getColumn(columnRef.getName());
+      if (found != null) {
+        candidates.add(found);
+      }
+    }
+
+    if (!candidates.isEmpty()) {
+      return ensureUniqueColumn(candidates);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Trying to find a column from all relations in other blocks
+   *
+   * @param plan The logical plan
+   * @param columnRef The column reference to be found
+   * @return The found column
+   */
+  static Column resolveFromAllRelsInAllBlocks(LogicalPlan plan, ColumnReferenceExpr columnRef) throws VerifyException {
+
+    List<Column> candidates = Lists.newArrayList();
+
+    // from all relations of all query blocks
+    for (LogicalPlan.QueryBlock eachBlock : plan.getQueryBlocks()) {
+
+      for (RelationNode rel : eachBlock.getRelations()) {
+        Column found = rel.getTableSchema().getColumn(columnRef.getName());
+        if (found != null) {
+          candidates.add(found);
+        }
+      }
+    }
+
+    if (!candidates.isEmpty()) {
+      return NameResolver.ensureUniqueColumn(candidates);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Try to find a column from the final schema of the current block.
+   *
+   * @param block The current query block
+   * @param columnRef The column reference to be found
+   * @return The found column
+   */
+  static Column resolveAliasedName(LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef) throws VerifyException {
+    List<Column> candidates = Lists.newArrayList();
+
+    if (block.getSchema() != null) {
+      Column found = block.getSchema().getColumn(columnRef.getName());
+      if (found != null) {
+        candidates.add(found);
+      }
+    }
+
+    if (!candidates.isEmpty()) {
+      return NameResolver.ensureUniqueColumn(candidates);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * It returns a pair of names, which the first value is ${database}.${table} and the second value
+   * is a simple column name.
+   *
+   * @param block The current block
+   * @param columnRef The column name
+   * @return A pair of normalized qualifier and column name
+   * @throws PlanningException
+   */
+  static Pair<String, String> normalizeQualifierAndCanonicalName(LogicalPlan.QueryBlock block,
+                                                                 ColumnReferenceExpr columnRef)
+      throws PlanningException {
+    String qualifier;
+    String canonicalName;
+
+    if (CatalogUtil.isFQTableName(columnRef.getQualifier())) {
+      qualifier = columnRef.getQualifier();
+      canonicalName = columnRef.getCanonicalName();
+    } else {
+      String resolvedDatabaseName = resolveDatabase(block, columnRef.getQualifier());
+      if (resolvedDatabaseName == null) {
+        throw new NoSuchColumnException(columnRef.getQualifier());
+      }
+      qualifier = CatalogUtil.buildFQName(resolvedDatabaseName, columnRef.getQualifier());
+      canonicalName = CatalogUtil.buildFQName(qualifier, columnRef.getName());
+    }
+
+    return new Pair<String, String>(qualifier, canonicalName);
+  }
+
+  static Column ensureUniqueColumn(List<Column> candidates) throws VerifyException {
+    if (candidates.size() == 1) {
+      return candidates.get(0);
+    } else if (candidates.size() > 2) {
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (Column column : candidates) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(column);
+      }
+      throw new AmbiguousFieldException("Ambiguous Column Name: " + sb.toString());
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java
new file mode 100644
index 0000000..4d9f9a5
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+/**
+ *
+ * <h2>Motivation</h2>
+ *
+ * Please take a look at the following example query:
+ *
+ *  <pre>
+ *   select (l_orderkey + l_orderkey) l_orderkey from lineitem where l_orderkey > 2 order by l_orderkey;
+ * </pre>
+ *
+ * Although <code>l_orderkey</code> seems to be ambiguous, the above usages are available in commercial DBMSs.
+ * In order to eliminate the ambiguity, Tajo follows the behaviors of PostgreSQL.
+ *
+ * <h2>Resolving Modes</h2>
+ *
+ * From the behaviors of PostgreSQL, we found that there are three kinds of name resolving modes.
+ * Each definition is as follows:
+ *
+ * <ul>
+ *   <li><b>RELS_ONLY</b> finds a column from the relations in the current block.
+ *  <li><b>RELS_AND_SUBEXPRS</b> finds a column from the all relations in the current block and
+ *  from aliased temporal fields; a temporal field means an explicitly aliased expression. If there are duplicated
+ *  columns in the relation and temporal fields, this level firstly chooses the field in a relation.</li>
+ *  <li><b>SUBEXPRS_AND_RELS</b> is very similar to <code>RELS_AND_SUBEXPRS</code>. The main difference is that it
+ *  firstly chooses an aliased temporal field instead of the fields in a relation.</li>
+ * </ul>
+ *
+ * <h2>The relationship between resolving modes and operators</h3>
+ *
+ * <ul>
+ *   <li>fields in select list and LIMIT are resolved in the REL_ONLY mode.</li>
+ *   <li>fields in WHERE clause are resolved in the RELS_AND_SUBEXPRS mode.</li>
+ *   <li>fields in GROUP BY, HAVING, and ORDER BY are resolved in the SUBEXPRS_AND_RELS mode.</li>
+ * </ul>
+ *
+ * <h2>Example</h2>
+ *
+ * Please revisit the aforementioned example:
+ *
+ * <pre>
+ *   select (l_orderkey + l_orderkey) l_orderkey from lineitem where l_orderkey > 2 order by l_orderkey;
+ * </pre>
+ *
+ * With the above rules and the relationship between modes and operators, we can easily identify which reference
+ * points to which field.
+ * <ol>
+ *   <li><code>l_orderkey</code> included in <code>(l_orderkey + l_orderkey)</code> points to the field
+ *   in the relation <code>lineitem</code>.</li>
+ *   <li><code>l_orderkey</code> included in WHERE clause also points to the field in the relation
+ *   <code>lineitem</code>.</li>
+ *   <li><code>l_orderkey</code> included in ORDER BY clause points to the temporal field
+ *   <code>(l_orderkey + l_orderkey)</code>.</li>
+ * </ol>
+ */
+public enum NameResolvingMode {
+  RELS_ONLY,          // finding from only relations
+  RELS_AND_SUBEXPRS,  // finding from relations and subexprs in a place
+  SUBEXPRS_AND_RELS,  // finding from subexprs and relations in a place
+  LEGACY              // Finding in a legacy manner (globally)
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
new file mode 100644
index 0000000..a1d9dbd
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NoSuchColumnException;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.RelationNode;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+
+public class ResolverByLegacy extends NameResolver {
+  @Override
+  public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+      throws PlanningException {
+
+    if (columnRef.hasQualifier()) {
+      return resolveColumnWithQualifier(plan, block, columnRef);
+    } else {
+      return resolveColumnWithoutQualifier(plan, block, columnRef);
+    }
+  }
+
+  private static Column resolveColumnWithQualifier(LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                                   ColumnReferenceExpr columnRef) throws PlanningException {
+    final String qualifier;
+    String canonicalName;
+    final String qualifiedName;
+
+    Pair<String, String> normalized = normalizeQualifierAndCanonicalName(block, columnRef);
+    qualifier = normalized.getFirst();
+    canonicalName = normalized.getSecond();
+    qualifiedName = CatalogUtil.buildFQName(qualifier, columnRef.getName());
+
+    Column found = resolveFromRelsWithinBlock(plan, block, columnRef);
+    if (found == null) {
+      throw new NoSuchColumnException(columnRef.getCanonicalName());
+    }
+
+    // If code reach here, a column is found.
+    // But, it may be aliased from bottom logical node.
+    // If the column is aliased, the found name may not be used in upper node.
+
+    // Here, we try to check if column reference is already aliased.
+    // If so, it replaces the name with aliased name.
+    LogicalNode currentNode = block.getCurrentNode();
+
+    // The condition (currentNode.getInSchema().contains(column)) means
+    // the column can be used at the current node. So, we don't need to find aliase name.
+    Schema currentNodeSchema = null;
+    if (currentNode != null) {
+      if (currentNode instanceof RelationNode) {
+        currentNodeSchema = ((RelationNode) currentNode).getTableSchema();
+      } else {
+        currentNodeSchema = currentNode.getInSchema();
+      }
+    }
+
+    if (currentNode != null && !currentNodeSchema.contains(found)
+        && currentNode.getType() != NodeType.TABLE_SUBQUERY) {
+      List<Column> candidates = TUtil.newList();
+      if (block.getNamedExprsManager().isAliased(qualifiedName)) {
+        String alias = block.getNamedExprsManager().getAlias(canonicalName);
+        found = resolve(plan, block, new ColumnReferenceExpr(alias), NameResolvingMode.LEGACY);
+        if (found != null) {
+          candidates.add(found);
+        }
+      }
+      if (!candidates.isEmpty()) {
+        return ensureUniqueColumn(candidates);
+      }
+    }
+
+    return found;
+  }
+
+  static Column resolveColumnWithoutQualifier(LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                                     ColumnReferenceExpr columnRef)throws PlanningException {
+
+    Column found = resolveFromAllRelsInBlock(block, columnRef);
+    if (found != null) {
+      return found;
+    }
+
+    found = resolveAliasedName(block, columnRef);
+    if (found != null) {
+      return found;
+    }
+
+    found = resolveFromCurrentAndChildNode(block, columnRef);
+    if (found != null) {
+      return found;
+    }
+
+    found = resolveFromAllRelsInAllBlocks(plan, columnRef);
+    if (found != null) {
+      return found;
+    }
+
+    throw new NoSuchColumnException("ERROR: no such a column name "+ columnRef.getCanonicalName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java
new file mode 100644
index 0000000..a67a1ca
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.NoSuchColumnException;
+
+public class ResolverByRels extends NameResolver {
+  @Override
+  public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+      throws PlanningException {
+
+    Column column = resolveFromRelsWithinBlock(plan, block, columnRef);
+    if (column == null) {
+      throw new NoSuchColumnException(columnRef.getCanonicalName());
+    }
+    return column;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java
new file mode 100644
index 0000000..f486117
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.exception.NoSuchColumnException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+public class ResolverByRelsAndSubExprs extends NameResolver {
+  @Override
+  public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+      throws PlanningException {
+
+    Column column = resolveFromRelsWithinBlock(plan, block, columnRef);
+    if (column == null) {
+      column =  resolveFromCurrentAndChildNode(block, columnRef);
+    }
+
+    if (column == null) {
+      throw new NoSuchColumnException(columnRef.getCanonicalName());
+    }
+    return column;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java
new file mode 100644
index 0000000..3064fde
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.NoSuchColumnException;
+
+public class ResolverBySubExprsAndRels extends NameResolver {
+  @Override
+  public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+      throws PlanningException {
+
+    Column column = resolveFromCurrentAndChildNode(block, columnRef);
+    if (column == null) {
+      column = resolveFromRelsWithinBlock(plan, block, columnRef);
+    }
+
+    if (column == null) {
+      throw new NoSuchColumnException(columnRef.getCanonicalName());
+    }
+    return column;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
new file mode 100644
index 0000000..491dda1
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is a basic query rewrite rule engine. This rewrite rule engine
+ * rewrites a logical plan with various query rewrite rules.
+ */
+public class BasicQueryRewriteEngine implements QueryRewriteEngine {
+  /** class logger */
+  private Log LOG = LogFactory.getLog(BasicQueryRewriteEngine.class);
+
+  /** a map for query rewrite rules  */
+  private Map<String, RewriteRule> rewriteRules = new LinkedHashMap<String, RewriteRule>();
+
+  /**
+   * Add a query rewrite rule to this engine.
+   *
+   * @param rule The rule to be added to this engine.
+   */
+  public void addRewriteRule(RewriteRule rule) {
+    if (!rewriteRules.containsKey(rule.getName())) {
+      rewriteRules.put(rule.getName(), rule);
+    }
+  }
+
+  /**
+   * Rewrite a logical plan with all query rewrite rules added to this engine.
+   *
+   * @param plan The plan to be rewritten with all query rewrite rule.
+   * @return The rewritten plan.
+   */
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    RewriteRule rule;
+    for (Entry<String, RewriteRule> rewriteRule : rewriteRules.entrySet()) {
+      rule = rewriteRule.getValue();
+      if (rule.isEligible(plan)) {
+        plan = rule.rewrite(plan);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query.");
+        }
+      }
+    }
+
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
new file mode 100644
index 0000000..b7f5637
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+public interface QueryRewriteEngine {
+  /**
+   * Rewrite a logical plan with all query rewrite rules added to this engine.
+   *
+   * @param plan The plan to be rewritten with all query rewrite rule.
+   * @return The rewritten plan.
+   */
+  LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java
new file mode 100644
index 0000000..0ba7460
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+/**
+ * An interface for a rewrite rule.
+ */
+public interface RewriteRule {
+
+  /**
+   * It returns the rewrite rule name. It will be used for debugging and
+   * building a optimization history.
+   *
+   * @return The rewrite rule name
+   */
+  String getName();
+
+  /**
+   * This method checks if this rewrite rule can be applied to a given query plan.
+   * For example, the selection push down can not be applied to the query plan without any filter.
+   * In such case, it will return false.
+   *
+   * @param plan The plan to be checked
+   * @return True if this rule can be applied to a given plan. Otherwise, false.
+   */
+  boolean isEligible(LogicalPlan plan);
+
+  /**
+   * Updates a logical plan and returns an updated logical plan rewritten by this rule.
+   * It must be guaranteed that the input logical plan is not modified even after rewrite.
+   * In other words, the rewrite has to modify an plan copied from the input plan.
+   *
+   * @param plan Input logical plan. It will not be modified.
+   * @return The rewritten logical plan.
+   */
+  LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
new file mode 100644
index 0000000..31063bf
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
@@ -0,0 +1,912 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import com.google.common.collect.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * This rule tries to push down all filter conditions into logical nodes as lower as possible.
+ * It is likely to significantly reduces the intermediate data.
+ */
+public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownContext, LogicalNode>
+    implements RewriteRule {
+  private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class);
+  private static final String NAME = "FilterPushDown";
+
+  static class FilterPushDownContext {
+    Set<EvalNode> pushingDownFilters = new HashSet<EvalNode>();
+
+    public void clear() {
+      pushingDownFilters.clear();
+    }
+    public void setFiltersTobePushed(Collection<EvalNode> workingEvals) {
+      this.pushingDownFilters.clear();
+      this.pushingDownFilters.addAll(workingEvals);
+    }
+    public void addFiltersTobePushed(Collection<EvalNode> workingEvals) {
+      this.pushingDownFilters.addAll(workingEvals);
+    }
+
+    public void setToOrigin(Map<EvalNode, EvalNode> evalMap) {
+      //evalMap: copy -> origin
+      List<EvalNode> origins = new ArrayList<EvalNode>();
+      for (EvalNode eval : pushingDownFilters) {
+        EvalNode origin = evalMap.get(eval);
+        if (origin != null) {
+          origins.add(origin);
+        }
+      }
+      setFiltersTobePushed(origins);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    /*
+    FilterPushDown rule: processing when visits each node
+      - If a target which is corresponding on a filter EvalNode's column is not FieldEval, do not PushDown.
+      - Replace filter EvalNode's column with child node's output column.
+        If there is no child node's output column, do not PushDown.
+      - When visit ScanNode, add filter eval to ScanNode's qual
+      - When visit GroupByNode, Find aggregation column in a filter EvalNode and
+        . If a parent is HavingNode, add filter eval to parent HavingNode.
+        . It not, create new HavingNode and set parent's child.
+     */
+    FilterPushDownContext context = new FilterPushDownContext();
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      context.clear();
+      this.visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("=============================================");
+      LOG.debug("FilterPushDown Optimized Query: \n" + plan.toString());
+      LOG.debug("=============================================");
+    }
+    return plan;
+  }
+
+  @Override
+  public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 SelectionNode selNode, Stack<LogicalNode> stack) throws PlanningException {
+    context.pushingDownFilters.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
+
+    stack.push(selNode);
+    visit(context, plan, block, selNode.getChild(), stack);
+    stack.pop();
+
+    if(context.pushingDownFilters.size() == 0) {
+      // remove the selection operator if there is no search condition after selection push.
+      LogicalNode node = stack.peek();
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        unary.setChild(selNode.getChild());
+      } else {
+        throw new InvalidQueryException("Unexpected Logical Query Plan");
+      }
+    } else { // if there remain search conditions
+
+      // check if it can be evaluated here
+      Set<EvalNode> matched = TUtil.newHashSet();
+      for (EvalNode eachEval : context.pushingDownFilters) {
+        if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
+          matched.add(eachEval);
+        }
+      }
+
+      // if there are search conditions which can be evaluated here,
+      // push down them and remove them from context.pushingDownFilters.
+      if (matched.size() > 0) {
+        selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
+        context.pushingDownFilters.removeAll(matched);
+      }
+    }
+
+    return selNode;
+  }
+
+  @Override
+  public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    // here we should stop selection pushdown on the null supplying side(s) of an outer join
+    // get the two operands of the join operation as well as the join type
+    JoinType joinType = joinNode.getJoinType();
+    EvalNode joinQual = joinNode.getJoinQual();
+    if (joinQual != null && LogicalPlanner.isOuterJoin(joinType)) {
+      BinaryEval binaryEval = (BinaryEval) joinQual;
+      // if both are fields
+      if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
+          binaryEval.getRightExpr().getType() == EvalType.FIELD) {
+
+        String leftTableName = ((FieldEval) binaryEval.getLeftExpr()).getQualifier();
+        String rightTableName = ((FieldEval) binaryEval.getRightExpr()).getQualifier();
+        List<String> nullSuppliers = Lists.newArrayList();
+        Set<String> leftTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
+            joinNode.getLeftChild()));
+        Set<String> rightTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
+            joinNode.getRightChild()));
+
+        // some verification
+        if (joinType == JoinType.FULL_OUTER) {
+          nullSuppliers.add(leftTableName);
+          nullSuppliers.add(rightTableName);
+
+          // verify that these null suppliers are indeed in the left and right sets
+          if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+          if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+
+        } else if (joinType == JoinType.LEFT_OUTER) {
+          nullSuppliers.add(((RelationNode)joinNode.getRightChild()).getCanonicalName());
+          //verify that this null supplier is indeed in the right sub-tree
+          if (!rightTableSet.contains(nullSuppliers.get(0))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+        } else if (joinType == JoinType.RIGHT_OUTER) {
+          if (((RelationNode)joinNode.getRightChild()).getCanonicalName().equals(rightTableName)) {
+            nullSuppliers.add(leftTableName);
+          } else {
+            nullSuppliers.add(rightTableName);
+          }
+
+          // verify that this null supplier is indeed in the left sub-tree
+          if (!leftTableSet.contains(nullSuppliers.get(0))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+        }
+      }
+    }
+
+    // get evals from ON clause
+    List<EvalNode> onConditions = new ArrayList<EvalNode>();
+    if (joinNode.hasJoinQual()) {
+      onConditions.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
+    }
+
+    boolean isTopMostJoin = stack.peek().getType() != NodeType.JOIN;
+
+    List<EvalNode> outerJoinPredicationEvals = new ArrayList<EvalNode>();
+    List<EvalNode> outerJoinFilterEvalsExcludePredication = new ArrayList<EvalNode>();
+    if (LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
+      // TAJO-853
+      // In the case of top most JOIN, all filters except JOIN condition aren't pushed down.
+      // That filters are processed by SELECTION NODE.
+      Set<String> nullSupplyingTableNameSet;
+      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
+      } else {
+        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
+      }
+
+      Set<String> preservedTableNameSet;
+      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
+      } else {
+        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
+      }
+
+      List<EvalNode> removedFromFilter = new ArrayList<EvalNode>();
+      for (EvalNode eachEval: context.pushingDownFilters) {
+        if (EvalTreeUtil.isJoinQual(block, eachEval, true)) {
+          outerJoinPredicationEvals.add(eachEval);
+          removedFromFilter.add(eachEval);
+        } else {
+          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachEval);
+          boolean canPushDown = true;
+          for (Column eachColumn: columns) {
+            if (nullSupplyingTableNameSet.contains(eachColumn.getQualifier())) {
+              canPushDown = false;
+              break;
+            }
+          }
+          if (!canPushDown) {
+            outerJoinFilterEvalsExcludePredication.add(eachEval);
+            removedFromFilter.add(eachEval);
+          }
+        }
+      }
+
+      context.pushingDownFilters.removeAll(removedFromFilter);
+
+      for (EvalNode eachOnEval: onConditions) {
+        if (EvalTreeUtil.isJoinQual(eachOnEval, true)) {
+          // If join condition, processing in the JoinNode.
+          outerJoinPredicationEvals.add(eachOnEval);
+        } else {
+          // If Eval has a column which belong to Preserved Row table, not using to push down but using JoinCondition
+          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachOnEval);
+          boolean canPushDown = true;
+          for (Column eachColumn: columns) {
+            if (preservedTableNameSet.contains(eachColumn.getQualifier())) {
+              canPushDown = false;
+              break;
+            }
+          }
+          if (canPushDown) {
+            context.pushingDownFilters.add(eachOnEval);
+          } else {
+            outerJoinPredicationEvals.add(eachOnEval);
+          }
+        }
+      }
+    } else {
+      context.pushingDownFilters.addAll(onConditions);
+    }
+
+    LogicalNode left = joinNode.getLeftChild();
+    LogicalNode right = joinNode.getRightChild();
+
+    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+    // Join's input schema = right child output columns + left child output columns
+    Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, block, joinNode, left, notMatched,
+        null, true, 0);
+    context.setFiltersTobePushed(transformedMap.keySet());
+    visit(context, plan, block, left, stack);
+
+    context.setToOrigin(transformedMap);
+    context.addFiltersTobePushed(notMatched);
+
+    notMatched.clear();
+    transformedMap = findCanPushdownAndTransform(context, block, joinNode, right, notMatched, null, true,
+        left.getOutSchema().size());
+    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+
+    visit(context, plan, block, right, stack);
+
+    context.setToOrigin(transformedMap);
+    context.addFiltersTobePushed(notMatched);
+
+    notMatched.clear();
+    List<EvalNode> matched = Lists.newArrayList();
+    if(LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
+      matched.addAll(outerJoinPredicationEvals);
+    } else {
+      for (EvalNode eval : context.pushingDownFilters) {
+        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, eval, joinNode, isTopMostJoin)) {
+          matched.add(eval);
+        }
+      }
+    }
+
+    EvalNode qual = null;
+    if (matched.size() > 1) {
+      // merged into one eval tree
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
+          matched.toArray(new EvalNode[matched.size()]));
+    } else if (matched.size() == 1) {
+      // if the number of matched expr is one
+      qual = matched.get(0);
+    }
+
+    if (qual != null) {
+      joinNode.setJoinQual(qual);
+
+      if (joinNode.getJoinType() == JoinType.CROSS) {
+        joinNode.setJoinType(JoinType.INNER);
+      }
+      context.pushingDownFilters.removeAll(matched);
+    }
+
+    context.pushingDownFilters.addAll(outerJoinFilterEvalsExcludePredication);
+    return joinNode;
+  }
+
+  private Map<EvalNode, EvalNode> transformEvalsWidthByPassNode(
+      Collection<EvalNode> originEvals, LogicalPlan plan,
+      LogicalPlan.QueryBlock block,
+      LogicalNode node, LogicalNode childNode) throws PlanningException {
+    // transformed -> pushingDownFilters
+    Map<EvalNode, EvalNode> transformedMap = new HashMap<EvalNode, EvalNode>();
+
+    if (originEvals.isEmpty()) {
+      return transformedMap;
+    }
+
+    if (node.getType() == NodeType.UNION) {
+      // If node is union, All eval's columns are simple name and matched with child's output schema.
+      Schema childOutSchema = childNode.getOutSchema();
+      for (EvalNode eval : originEvals) {
+        EvalNode copy;
+        try {
+          copy = (EvalNode) eval.clone();
+        } catch (CloneNotSupportedException e) {
+          throw new PlanningException(e);
+        }
+
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+        for (Column c : columns) {
+          Column column = childOutSchema.getColumn(c.getSimpleName());
+          if (column == null) {
+            throw new PlanningException(
+                "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
+                    + c.getQualifiedName() + " for FilterPushDown(" + eval + "), " +
+                    "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")");
+          }
+          EvalTreeUtil.changeColumnRef(copy, c.getSimpleName(), column.getQualifiedName());
+        }
+
+        transformedMap.put(copy, eval);
+      }
+      return transformedMap;
+    }
+
+    if (childNode.getType() == NodeType.UNION) {
+      // If child is union, remove qualifier from eval's column
+      for (EvalNode eval : originEvals) {
+        EvalNode copy;
+        try {
+          copy = (EvalNode) eval.clone();
+        } catch (CloneNotSupportedException e) {
+          throw new PlanningException(e);
+        }
+
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+        for (Column c : columns) {
+          if (c.hasQualifier()) {
+            EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), c.getSimpleName());
+          }
+        }
+
+        transformedMap.put(copy, eval);
+      }
+
+      return transformedMap;
+    }
+
+    // node in column -> child out column
+    Map<String, String> columnMap = new HashMap<String, String>();
+
+    for (int i = 0; i < node.getInSchema().size(); i++) {
+      String inColumnName = node.getInSchema().getColumn(i).getQualifiedName();
+      Column childOutColumn = childNode.getOutSchema().getColumn(i);
+      columnMap.put(inColumnName, childOutColumn.getQualifiedName());
+    }
+
+    // Rename from upper block's one to lower block's one
+    for (EvalNode matchedEval : originEvals) {
+      EvalNode copy;
+      try {
+        copy = (EvalNode) matchedEval.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new PlanningException(e);
+      }
+
+      Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+      boolean allMatched = true;
+      for (Column c : columns) {
+        if (columnMap.containsKey(c.getQualifiedName())) {
+          EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), columnMap.get(c.getQualifiedName()));
+        } else {
+          if (childNode.getType() == NodeType.GROUP_BY) {
+            if (((GroupbyNode) childNode).isAggregationColumn(c.getSimpleName())) {
+              allMatched = false;
+              break;
+            }
+          } else {
+            throw new PlanningException(
+                "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
+                    + c.getQualifiedName() + " for FilterPushDown(" + matchedEval + "), " +
+                    "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")"
+            );
+          }
+        }
+      }
+      if (allMatched) {
+        transformedMap.put(copy, matchedEval);
+      }
+    }
+
+    return transformedMap;
+  }
+
+  @Override
+  public LogicalNode visitTableSubQuery(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                        TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
+    List<EvalNode> matched = Lists.newArrayList();
+    for (EvalNode eval : context.pushingDownFilters) {
+      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, node)) {
+        matched.add(eval);
+      }
+    }
+
+    // transformed -> pushingDownFilters
+    Map<EvalNode, EvalNode> transformedMap =
+        transformEvalsWidthByPassNode(matched, plan, block, node, node.getSubQuery());
+
+    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+    visit(context, plan, plan.getBlock(node.getSubQuery()));
+    context.setToOrigin(transformedMap);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitUnion(FilterPushDownContext context, LogicalPlan plan,
+                                LogicalPlan.QueryBlock block, UnionNode unionNode,
+                                Stack<LogicalNode> stack) throws PlanningException {
+    LogicalNode leftNode = unionNode.getLeftChild();
+
+    List<EvalNode> origins = new ArrayList<EvalNode>(context.pushingDownFilters);
+
+    // transformed -> pushingDownFilters
+    Map<EvalNode, EvalNode> transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, leftNode);
+    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+    visit(context, plan, plan.getBlock(leftNode));
+
+    if (!context.pushingDownFilters.isEmpty()) {
+      errorFilterPushDown(plan, leftNode, context);
+    }
+
+    LogicalNode rightNode = unionNode.getRightChild();
+    transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, rightNode);
+    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+    visit(context, plan, plan.getBlock(rightNode), rightNode, stack);
+
+    if (!context.pushingDownFilters.isEmpty()) {
+      errorFilterPushDown(plan, rightNode, context);
+    }
+
+    // notify all filter matched to upper
+    context.pushingDownFilters.clear();
+    return unionNode;
+  }
+
+  @Override
+  public LogicalNode visitProjection(FilterPushDownContext context,
+                                     LogicalPlan plan,
+                                     LogicalPlan.QueryBlock block,
+                                     ProjectionNode projectionNode,
+                                     Stack<LogicalNode> stack) throws PlanningException {
+    LogicalNode childNode = projectionNode.getChild();
+
+    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+
+    //copy -> origin
+    BiMap<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(
+        context, block,projectionNode, childNode, notMatched, null, false, 0);
+
+    context.setFiltersTobePushed(transformedMap.keySet());
+
+    stack.push(projectionNode);
+    childNode = visit(context, plan, plan.getBlock(childNode), childNode, stack);
+    stack.pop();
+
+    // find not matched after visiting child
+    for (EvalNode eval: context.pushingDownFilters) {
+      notMatched.add(transformedMap.get(eval));
+    }
+
+    EvalNode qual = null;
+    if (notMatched.size() > 1) {
+      // merged into one eval tree
+      qual = AlgebraicUtil.createSingletonExprFromCNF(notMatched.toArray(new EvalNode[notMatched.size()]));
+    } else if (notMatched.size() == 1) {
+      // if the number of matched expr is one
+      qual = notMatched.get(0);
+    }
+
+    // If there is not matched node add SelectionNode and clear context.pushingDownFilters
+    if (qual != null && LogicalPlanner.checkIfBeEvaluatedAtThis(qual, projectionNode)) {
+      SelectionNode selectionNode = plan.createNode(SelectionNode.class);
+      selectionNode.setInSchema(childNode.getOutSchema());
+      selectionNode.setOutSchema(childNode.getOutSchema());
+      selectionNode.setQual(qual);
+      block.registerNode(selectionNode);
+
+      projectionNode.setChild(selectionNode);
+      selectionNode.setChild(childNode);
+
+      // clean all remain filters because all conditions are merged into a qual
+      context.pushingDownFilters.clear();
+    }
+
+    // if there are remain filters, recover the original names and give back to the upper query block.
+    if (context.pushingDownFilters.size() > 0) {
+      ImmutableSet<EvalNode> copy = ImmutableSet.copyOf(context.pushingDownFilters);
+      context.pushingDownFilters.clear();
+      context.pushingDownFilters.addAll(reverseTransform(transformedMap, copy));
+    }
+
+    return projectionNode;
+  }
+
+  private Collection<EvalNode> reverseTransform(BiMap<EvalNode, EvalNode> map, Set<EvalNode> remainFilters) {
+    Set<EvalNode> reversed = Sets.newHashSet();
+    for (EvalNode evalNode : remainFilters) {
+      reversed.add(map.get(evalNode));
+    }
+    return reversed;
+  }
+
+  private BiMap<EvalNode, EvalNode> findCanPushdownAndTransform(
+      FilterPushDownContext context, LogicalPlan.QueryBlock block, Projectable node,
+      LogicalNode childNode, List<EvalNode> notMatched,
+      Set<String> partitionColumns,
+      boolean ignoreJoin, int columnOffset) throws PlanningException {
+    // canonical name -> target
+    Map<String, Target> nodeTargetMap = new HashMap<String, Target>();
+    for (Target target : node.getTargets()) {
+      nodeTargetMap.put(target.getCanonicalName(), target);
+    }
+
+    // copy -> origin
+    BiMap<EvalNode, EvalNode> matched = HashBiMap.create();
+
+    for (EvalNode eval : context.pushingDownFilters) {
+      if (ignoreJoin && EvalTreeUtil.isJoinQual(block, eval, true)) {
+        notMatched.add(eval);
+        continue;
+      }
+      // If all column is field eval, can push down.
+      Set<Column> evalColumns = EvalTreeUtil.findUniqueColumns(eval);
+      boolean columnMatched = true;
+      for (Column c : evalColumns) {
+        Target target = nodeTargetMap.get(c.getQualifiedName());
+        if (target == null) {
+          columnMatched = false;
+          break;
+        }
+        if (target.getEvalTree().getType() != EvalType.FIELD) {
+          columnMatched = false;
+          break;
+        }
+      }
+
+      if (columnMatched) {
+        // transform eval column to child's output column
+        EvalNode copyEvalNode = transformEval(node, childNode, eval, nodeTargetMap, partitionColumns, columnOffset);
+        if (copyEvalNode != null) {
+          matched.put(copyEvalNode, eval);
+        } else {
+          notMatched.add(eval);
+        }
+      } else {
+        notMatched.add(eval);
+      }
+    }
+
+    return matched;
+  }
+
+  private EvalNode transformEval(Projectable node, LogicalNode childNode, EvalNode origin,
+                                 Map<String, Target> targetMap, Set<String> partitionColumns,
+                                 int columnOffset) throws PlanningException {
+    Schema outputSchema = childNode != null ? childNode.getOutSchema() : node.getInSchema();
+    EvalNode copy;
+    try {
+      copy = (EvalNode) origin.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new PlanningException(e);
+    }
+    Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+    for (Column c: columns) {
+      Target target = targetMap.get(c.getQualifiedName());
+      if (target == null) {
+        throw new PlanningException(
+            "Invalid Filter PushDown: No such a corresponding target '"
+                + c.getQualifiedName() + " for FilterPushDown(" + origin + "), " +
+                "(PID=" + node.getPID() + ")"
+        );
+      }
+      EvalNode targetEvalNode = target.getEvalTree();
+      if (targetEvalNode.getType() != EvalType.FIELD) {
+        throw new PlanningException(
+            "Invalid Filter PushDown: '" + c.getQualifiedName() + "' target is not FieldEval " +
+                "(PID=" + node.getPID() + ")"
+        );
+      }
+
+      FieldEval fieldEval = (FieldEval)targetEvalNode;
+      Column targetInputColumn = fieldEval.getColumnRef();
+
+      int index;
+      if (targetInputColumn.hasQualifier()) {
+        index = node.getInSchema().getColumnId(targetInputColumn.getQualifiedName());
+      } else {
+        index = node.getInSchema().getColumnIdByName(targetInputColumn.getQualifiedName());
+      }
+      if (columnOffset > 0) {
+        index = index - columnOffset;
+      }
+      if (index < 0 || index >= outputSchema.size()) {
+        if (partitionColumns != null && !partitionColumns.isEmpty() && node instanceof ScanNode) {
+          ScanNode scanNode = (ScanNode)node;
+          boolean isPartitionColumn = false;
+          if (CatalogUtil.isFQColumnName(partitionColumns.iterator().next())) {
+            isPartitionColumn = partitionColumns.contains(
+                CatalogUtil.buildFQName(scanNode.getTableName(), c.getSimpleName()));
+          } else {
+            isPartitionColumn = partitionColumns.contains(c.getSimpleName());
+          }
+          if (isPartitionColumn) {
+            EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(),
+                scanNode.getCanonicalName() + "." + c.getSimpleName());
+          } else {
+            return null;
+          }
+        } else {
+          return null;
+        }
+      } else {
+        Column outputColumn = outputSchema.getColumn(index);
+        EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), outputColumn.getQualifiedName());
+      }
+    }
+
+    return copy;
+  }
+
+  /**
+   * Find aggregation columns in filter eval and add having clause or add HavingNode.
+   * @param context
+   * @param plan
+   * @param block
+   * @param parentNode  If null, having is parent
+   * @param havingNode      If null, projection is parent
+   * @param groupByNode
+   * @return matched origin eval
+   * @throws PlanningException
+   */
+  private List<EvalNode> addHavingNode(FilterPushDownContext context, LogicalPlan plan,
+                                       LogicalPlan.QueryBlock block,
+                                       UnaryNode parentNode,
+                                       HavingNode havingNode,
+                                       GroupbyNode groupByNode) throws PlanningException {
+    // find aggregation column
+    Set<Column> groupingColumns = new HashSet<Column>(Arrays.asList(groupByNode.getGroupingColumns()));
+    Set<String> aggrFunctionOutColumns = new HashSet<String>();
+    for (Column column : groupByNode.getOutSchema().getColumns()) {
+      if (!groupingColumns.contains(column)) {
+        aggrFunctionOutColumns.add(column.getQualifiedName());
+      }
+    }
+
+    List<EvalNode> aggrEvalOrigins = new ArrayList<EvalNode>();
+    List<EvalNode> aggrEvals = new ArrayList<EvalNode>();
+
+    for (EvalNode eval : context.pushingDownFilters) {
+      EvalNode copy = null;
+      try {
+        copy = (EvalNode)eval.clone();
+      } catch (CloneNotSupportedException e) {
+      }
+      boolean isEvalAggrFunction = false;
+      for (Column evalColumn : EvalTreeUtil.findUniqueColumns(copy)) {
+        if (aggrFunctionOutColumns.contains(evalColumn.getSimpleName())) {
+          EvalTreeUtil.changeColumnRef(copy, evalColumn.getQualifiedName(), evalColumn.getSimpleName());
+          isEvalAggrFunction = true;
+          break;
+        }
+      }
+      if (isEvalAggrFunction) {
+        aggrEvals.add(copy);
+        aggrEvalOrigins.add(eval);
+      }
+    }
+
+    if (aggrEvals.isEmpty()) {
+      return aggrEvalOrigins;
+    }
+
+    // transform
+
+    HavingNode workingHavingNode;
+    if (havingNode != null) {
+      workingHavingNode = havingNode;
+      aggrEvals.add(havingNode.getQual());
+    } else {
+      workingHavingNode = plan.createNode(HavingNode.class);
+      block.registerNode(workingHavingNode);
+      parentNode.setChild(workingHavingNode);
+      workingHavingNode.setChild(groupByNode);
+    }
+
+    EvalNode qual = null;
+    if (aggrEvals.size() > 1) {
+      // merged into one eval tree
+      qual = AlgebraicUtil.createSingletonExprFromCNF(aggrEvals.toArray(new EvalNode[aggrEvals.size()]));
+    } else if (aggrEvals.size() == 1) {
+      // if the number of matched expr is one
+      qual = aggrEvals.get(0);
+    }
+
+    // If there is not matched node add SelectionNode and clear context.pushingDownFilters
+    if (qual != null) {
+      workingHavingNode.setQual(qual);
+    }
+
+    return aggrEvalOrigins;
+  }
+
+  @Override
+  public LogicalNode visitWindowAgg(FilterPushDownContext context, LogicalPlan plan,
+                                  LogicalPlan.QueryBlock block, WindowAggNode winAggNode,
+                                  Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(winAggNode);
+    super.visitWindowAgg(context, plan, block, winAggNode, stack);
+    stack.pop();
+    return winAggNode;
+  }
+
+
+  @Override
+  public LogicalNode visitGroupBy(FilterPushDownContext context, LogicalPlan plan,
+                                  LogicalPlan.QueryBlock block, GroupbyNode groupbyNode,
+                                  Stack<LogicalNode> stack) throws PlanningException {
+    LogicalNode parentNode = stack.peek();
+    List<EvalNode> aggrEvals;
+    if (parentNode.getType() == NodeType.HAVING) {
+      aggrEvals = addHavingNode(context, plan, block, null, (HavingNode)parentNode, groupbyNode);
+    } else {
+      aggrEvals = addHavingNode(context, plan, block, (UnaryNode)parentNode, null, groupbyNode);
+    }
+
+    if (aggrEvals != null) {
+      // remove aggregation eval from conext
+      context.pushingDownFilters.removeAll(aggrEvals);
+    }
+
+    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+    // transform
+    Map<EvalNode, EvalNode> tranformed =
+        findCanPushdownAndTransform(context, block, groupbyNode,groupbyNode.getChild(), notMatched, null, false, 0);
+
+    context.setFiltersTobePushed(tranformed.keySet());
+    LogicalNode current = super.visitGroupBy(context, plan, block, groupbyNode, stack);
+
+    context.setToOrigin(tranformed);
+    context.addFiltersTobePushed(notMatched);
+
+    return current;
+  }
+
+  @Override
+  public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
+                               LogicalPlan.QueryBlock block, ScanNode scanNode,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    List<EvalNode> matched = Lists.newArrayList();
+
+    // find partition column and check matching
+    Set<String> partitionColumns = new HashSet<String>();
+    TableDesc table = scanNode.getTableDesc();
+    boolean hasQualifiedName = false;
+    if (table.hasPartition()) {
+      for (Column c: table.getPartitionMethod().getExpressionSchema().getColumns()) {
+        partitionColumns.add(c.getQualifiedName());
+        hasQualifiedName = c.hasQualifier();
+      }
+    }
+    Set<EvalNode> partitionEvals = new HashSet<EvalNode>();
+    for (EvalNode eval : context.pushingDownFilters) {
+      if (table.hasPartition()) {
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(eval);
+        if (columns.size() != 1) {
+          continue;
+        }
+        Column column = columns.iterator().next();
+
+        // If catalog runs with HCatalog, partition column is a qualified name
+        // Else partition column is a simple name
+        boolean isPartitionColumn = false;
+        if (hasQualifiedName) {
+          isPartitionColumn = partitionColumns.contains(CatalogUtil.buildFQName(table.getName(), column.getSimpleName()));
+        } else {
+          isPartitionColumn = partitionColumns.contains(column.getSimpleName());
+        }
+        if (isPartitionColumn) {
+          EvalNode copy;
+          try {
+            copy = (EvalNode) eval.clone();
+          } catch (CloneNotSupportedException e) {
+            throw new PlanningException(e);
+          }
+          EvalTreeUtil.changeColumnRef(copy, column.getQualifiedName(),
+              scanNode.getCanonicalName() + "." + column.getSimpleName());
+          matched.add(copy);
+          partitionEvals.add(eval);
+        }
+      }
+    }
+
+    context.pushingDownFilters.removeAll(partitionEvals);
+
+    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+
+    // transform
+    Map<EvalNode, EvalNode> transformed =
+        findCanPushdownAndTransform(context, block, scanNode, null, notMatched, partitionColumns, true, 0);
+
+    for (EvalNode eval : transformed.keySet()) {
+      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {
+        matched.add(eval);
+      }
+    }
+
+    EvalNode qual = null;
+    if (matched.size() > 1) {
+      // merged into one eval tree
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
+          matched.toArray(new EvalNode[matched.size()]));
+    } else if (matched.size() == 1) {
+      // if the number of matched expr is one
+      qual = matched.iterator().next();
+    }
+
+    if (qual != null) { // if a matched qual exists
+      scanNode.setQual(qual);
+    }
+
+    for (EvalNode matchedEval: matched) {
+      transformed.remove(matchedEval);
+    }
+
+    context.setToOrigin(transformed);
+    context.addFiltersTobePushed(notMatched);
+
+    return scanNode;
+  }
+
+  private void errorFilterPushDown(LogicalPlan plan, LogicalNode node,
+                                   FilterPushDownContext context) throws PlanningException {
+    String notMatchedNodeStr = "";
+    String prefix = "";
+    for (EvalNode notMatchedNode: context.pushingDownFilters) {
+      notMatchedNodeStr += prefix + notMatchedNode;
+      prefix = ", ";
+    }
+    throw new PlanningException("FilterPushDown failed cause some filters not matched: " + notMatchedNodeStr + "\n" +
+        "Error node: " + node.getPlanString() + "\n" +
+        plan.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
new file mode 100644
index 0000000..14b24a7
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.StringUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class PartitionedTableRewriter implements RewriteRule {
+  private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
+
+  private static final String NAME = "Partitioned Table Rewriter";
+  private final Rewriter rewriter = new Rewriter();
+
+  private final TajoConf systemConf;
+
+  public PartitionedTableRewriter(TajoConf conf) {
+    systemConf = conf;
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      for (RelationNode relation : block.getRelations()) {
+        if (relation.getType() == NodeType.SCAN) {
+          TableDesc table = ((ScanNode)relation).getTableDesc();
+          if (table.hasPartition()) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
+    rewriter.visit(rootBlock, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
+    return plan;
+  }
+
+  private static class PartitionPathFilter implements PathFilter {
+
+    private Schema schema;
+    private EvalNode partitionFilter;
+    public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
+      this.schema = schema;
+      this.partitionFilter = partitionFilter;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      Tuple tuple = buildTupleFromPartitionPath(schema, path, true);
+      if (tuple == null) { // if it is a file or not acceptable file
+        return false;
+      }
+
+      return partitionFilter.eval(schema, tuple).asBool();
+    }
+
+    @Override
+    public String toString() {
+      return partitionFilter.toString();
+    }
+  }
+
+  /**
+   * It assumes that each conjunctive form corresponds to one column.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms search condition corresponding to partition columns.
+   *                         If it is NULL, it means that there is no search condition for this table.
+   * @param tablePath
+   * @return
+   * @throws IOException
+   */
+  private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
+      throws IOException {
+
+    FileSystem fs = tablePath.getFileSystem(systemConf);
+
+    PathFilter [] filters;
+    if (conjunctiveForms == null) {
+      filters = buildAllAcceptingPathFilters(partitionColumns);
+    } else {
+      filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
+    }
+
+    // loop from one to the number of partition columns
+    Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
+
+    for (int i = 1; i < partitionColumns.size(); i++) {
+      // Get all file status matched to a ith level path filter.
+      filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
+    }
+
+    LOG.info("Filtered directory or files: " + filteredPaths.length);
+    return filteredPaths;
+  }
+
+  /**
+   * Build path filters for all levels with a list of filter conditions.
+   *
+   * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
+   * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
+   *
+   * Corresponding filter conditions will be placed on each path filter,
+   * If there is no corresponding expression for certain column,
+   * The condition will be filled with a true value.
+   *
+   * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
+   * There is no filter condition corresponding to col2.
+   * Then, the path filter conditions are corresponding to the followings:
+   *
+   * The first path filter: col1 = 'A'
+   * The second path filter: col1 = 'A' AND col2 IS NOT NULL
+   * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
+   *
+   * 'IS NOT NULL' predicate is always true against the partition path.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms
+   * @return
+   */
+  private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
+                                                     EvalNode [] conjunctiveForms) {
+    // Building partition path filters for all levels
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.size()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+
+      for (EvalNode expr : conjunctiveForms) {
+        if (EvalTreeUtil.findUniqueColumns(expr).contains(target)) {
+          // Accumulate one qual per level
+          accumulatedFilters.add(expr);
+        }
+      }
+
+      if (accumulatedFilters.size() < (i + 1)) {
+        accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+      }
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  /**
+   * Build an array of path filters for all levels with all accepting filter condition.
+   * @param partitionColumns The partition columns schema
+   * @return The array of path filter, accpeting all partition paths.
+   */
+  private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.size()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+      accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  private static Path [] toPathArray(FileStatus[] fileStatuses) {
+    Path [] paths = new Path[fileStatuses.length];
+    for (int j = 0; j < fileStatuses.length; j++) {
+      paths[j] = fileStatuses[j].getPath();
+    }
+    return paths;
+  }
+
+  private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
+    TableDesc table = scanNode.getTableDesc();
+    PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
+
+    Schema paritionValuesSchema = new Schema();
+    for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
+      paritionValuesSchema.addColumn(column);
+    }
+
+    Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+
+    // if a query statement has a search condition, try to find indexable predicates
+    if (scanNode.hasQual()) {
+      EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
+      Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
+
+      // add qualifier to schema for qual
+      paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
+      for (Column column : paritionValuesSchema.getColumns()) {
+        for (EvalNode simpleExpr : conjunctiveForms) {
+          if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
+            indexablePredicateSet.add(simpleExpr);
+          }
+        }
+      }
+
+      // Partitions which are not matched to the partition filter conditions are pruned immediately.
+      // So, the partition filter conditions are not necessary later, and they are removed from
+      // original search condition for simplicity and efficiency.
+      remainExprs.removeAll(indexablePredicateSet);
+      if (remainExprs.isEmpty()) {
+        scanNode.setQual(null);
+      } else {
+        scanNode.setQual(
+            AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
+      }
+    }
+
+    if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
+      return findFilteredPaths(paritionValuesSchema,
+          indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
+    } else { // otherwise, we will get all partition paths.
+      return findFilteredPaths(paritionValuesSchema, null, table.getPath());
+    }
+  }
+
+  private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+    if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
+      Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
+      // if it contains only single variable matched to a target column
+      return variables.size() == 1 && variables.contains(targetColumn);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check if an expression consists of one variable and one constant and
+   * the expression is a comparison operator.
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an expression consists of one variable and one constant
+   * and the expression is a comparison operator. Other, false.
+   */
+  private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+    // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
+    return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
+  }
+
+  /**
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an disjunctive expression, consisting of indexable expressions
+   */
+  private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
+    if (evalNode.getType() == EvalType.OR) {
+      BinaryEval orEval = (BinaryEval) evalNode;
+      boolean indexable =
+          checkIfIndexablePredicate(orEval.getLeftExpr()) &&
+              checkIfIndexablePredicate(orEval.getRightExpr());
+
+      boolean sameVariable =
+          EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr())
+          .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr()));
+
+      return indexable && sameVariable;
+    } else {
+      return false;
+    }
+  }
+
+  private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
+    if (scanNode.getInputPaths().length > 0) {
+      try {
+        FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
+        long totalVolume = 0;
+
+        for (Path input : scanNode.getInputPaths()) {
+          ContentSummary summary = fs.getContentSummary(input);
+          totalVolume += summary.getLength();
+          totalVolume += summary.getFileCount();
+        }
+        scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
+      } catch (IOException e) {
+        throw new PlanningException(e);
+      }
+    }
+  }
+
+  /**
+   * Take a look at a column partition path. A partition path consists
+   * of a table path part and column values part. This method transforms
+   * a partition path into a tuple with a given partition column schema.
+   *
+   * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
+   *                   ^^^^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^^^^^^^^^
+   *                      table path part        column values part
+   *
+   * When a file path is given, it can perform two ways depending on beNullIfFile flag.
+   * If it is true, it returns NULL when a given path is a file.
+   * Otherwise, it returns a built tuple regardless of file or directory.
+   *
+   * @param partitionColumnSchema The partition column schema
+   * @param partitionPath The partition path
+   * @param beNullIfFile If true, this method returns NULL when a given path is a file.
+   * @return The tuple transformed from a column values part.
+   */
+  public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
+                                                  boolean beNullIfFile) {
+    int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
+
+    if (startIdx == -1) { // if there is no partition column in the patch
+      return null;
+    }
+    String columnValuesPart = partitionPath.toString().substring(startIdx);
+
+    String [] columnValues = columnValuesPart.split("/");
+
+    // true means this is a file.
+    if (beNullIfFile && partitionColumnSchema.size() < columnValues.length) {
+      return null;
+    }
+
+    Tuple tuple = new VTuple(partitionColumnSchema.size());
+    int i = 0;
+    for (; i < columnValues.length && i < partitionColumnSchema.size(); i++) {
+      String [] parts = columnValues[i].split("=");
+      if (parts.length != 2) {
+        return null;
+      }
+      int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
+      Column keyColumn = partitionColumnSchema.getColumn(columnId);
+      tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), StringUtils.unescapePathName(parts[1])));
+    }
+    for (; i < partitionColumnSchema.size(); i++) {
+      tuple.put(i, NullDatum.get());
+    }
+    return tuple;
+  }
+
+  /**
+   * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
+   * Then, you will get a string 'col1='.
+   *
+   * @param partitionColumn the schema of column partition
+   * @return The first part string of column partition path.
+   */
+  private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(partitionColumn.getColumn(0).getSimpleName()).append("=");
+    return sb.toString();
+  }
+
+  private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> {
+    @Override
+    public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode,
+                            Stack<LogicalNode> stack) throws PlanningException {
+
+      TableDesc table = scanNode.getTableDesc();
+      if (!table.hasPartition()) {
+        return null;
+      }
+
+      try {
+        Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
+        plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
+        PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class);
+        rewrittenScanNode.init(scanNode, filteredPaths);
+        updateTableStat(rewrittenScanNode);
+
+        // if it is topmost node, set it as the rootnode of this block.
+        if (stack.empty() || block.getRoot().equals(scanNode)) {
+          block.setRoot(rewrittenScanNode);
+        } else {
+          PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
+        }
+      } catch (IOException e) {
+        throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
+      }
+      return null;
+    }
+  }
+}