You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/10/25 20:17:53 UTC
[04/28] TAJO-1125: Separate logical plan and optimizer into a maven
module.
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
new file mode 100644
index 0000000..51a016f
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.exception.NoSuchColumnException;
+import org.apache.tajo.plan.algebra.AmbiguousFieldException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.plan.logical.RelationNode;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * NameResolver utility
+ */
+public abstract class NameResolver {
+
+ public static Map<NameResolvingMode, NameResolver> resolverMap = Maps.newHashMap();
+
+ static {
+ resolverMap.put(NameResolvingMode.RELS_ONLY, new ResolverByRels());
+ resolverMap.put(NameResolvingMode.RELS_AND_SUBEXPRS, new ResolverByRelsAndSubExprs());
+ resolverMap.put(NameResolvingMode.SUBEXPRS_AND_RELS, new ResolverBySubExprsAndRels());
+ resolverMap.put(NameResolvingMode.LEGACY, new ResolverByLegacy());
+ }
+
+ abstract Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+ throws PlanningException;
+
+ /**
+ * Try to find the database name
+ *
+ * @param block the current block
+ * @param tableName The table name
+ * @return The found database name
+ * @throws PlanningException
+ */
+ public static String resolveDatabase(LogicalPlan.QueryBlock block, String tableName) throws PlanningException {
+ List<String> found = new ArrayList<String>();
+ for (RelationNode relation : block.getRelations()) {
+ // check alias name or table name
+ if (CatalogUtil.extractSimpleName(relation.getCanonicalName()).equals(tableName) ||
+ CatalogUtil.extractSimpleName(relation.getTableName()).equals(tableName)) {
+ // obtain the database name
+ found.add(CatalogUtil.extractQualifier(relation.getTableName()));
+ }
+ }
+
+ if (found.size() == 0) {
+ return null;
+ } else if (found.size() > 1) {
+ throw new PlanningException("Ambiguous table name \"" + tableName + "\"");
+ }
+
+ return found.get(0);
+ }
+
+ public static Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr column,
+ NameResolvingMode mode) throws PlanningException {
+ if (!resolverMap.containsKey(mode)) {
+ throw new PlanningException("Unsupported name resolving level: " + mode.name());
+ }
+ return resolverMap.get(mode).resolve(plan, block, column);
+ }
+
+ /**
+ * Try to find a column from all relations within a given query block.
+ * If a given column reference is qualified, it tries to resolve the name
+ * from only the relation corresponding to the qualifier.
+ *
+ * @param plan The logical plan
+ * @param block The current query block
+ * @param columnRef The column reference to be found
+ * @return The found column
+ * @throws PlanningException
+ */
+ static Column resolveFromRelsWithinBlock(LogicalPlan plan, LogicalPlan.QueryBlock block,
+ ColumnReferenceExpr columnRef) throws PlanningException {
+ String qualifier;
+ String canonicalName;
+
+ if (columnRef.hasQualifier()) {
+ Pair<String, String> normalized = normalizeQualifierAndCanonicalName(block, columnRef);
+ qualifier = normalized.getFirst();
+ canonicalName = normalized.getSecond();
+
+ RelationNode relationOp = block.getRelation(qualifier);
+
+ // If we cannot find any relation against a qualified column name
+ if (relationOp == null) {
+ throw null;
+ }
+
+ // Please consider a query case:
+ // select lineitem.l_orderkey from lineitem a order by lineitem.l_orderkey;
+ //
+ // The relation lineitem is already renamed to "a", but lineitem.l_orderkey still can be used.
+ // The below code makes it available. Otherwise, it cannot find any match in the relation schema.
+ if (block.isAlreadyRenamedTableName(CatalogUtil.extractQualifier(canonicalName))) {
+ canonicalName =
+ CatalogUtil.buildFQName(relationOp.getCanonicalName(), CatalogUtil.extractSimpleName(canonicalName));
+ }
+
+ Schema schema = relationOp.getTableSchema();
+ Column column = schema.getColumn(canonicalName);
+
+ return column;
+ } else {
+ return resolveFromAllRelsInBlock(block, columnRef);
+ }
+ }
+
+ /**
+ * Try to find the column from the current node and child node. It can find subexprs generated from the optimizer.
+ *
+ * @param block The current query block
+ * @param columnRef The column reference to be found
+ * @return The found column
+ */
+ static Column resolveFromCurrentAndChildNode(LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+ throws NoSuchColumnException {
+
+ if (block.getCurrentNode() != null && block.getCurrentNode().getInSchema() != null) {
+ Column found = block.getCurrentNode().getInSchema().getColumn(columnRef.getCanonicalName());
+ if (found != null) {
+ return found;
+ } else if (block.getLatestNode() != null) {
+ found = block.getLatestNode().getOutSchema().getColumn(columnRef.getName());
+ if (found != null) {
+ return found;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * It tries to find a full qualified column name from all relations in the current block.
+ *
+ * @param block The current query block
+ * @param columnRef The column reference to be found
+ * @return The found column
+ */
+ static Column resolveFromAllRelsInBlock(LogicalPlan.QueryBlock block,
+ ColumnReferenceExpr columnRef) throws VerifyException {
+ List<Column> candidates = TUtil.newList();
+
+ for (RelationNode rel : block.getRelations()) {
+ Column found = rel.getTableSchema().getColumn(columnRef.getName());
+ if (found != null) {
+ candidates.add(found);
+ }
+ }
+
+ if (!candidates.isEmpty()) {
+ return ensureUniqueColumn(candidates);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Trying to find a column from all relations in other blocks
+ *
+ * @param plan The logical plan
+ * @param columnRef The column reference to be found
+ * @return The found column
+ */
+ static Column resolveFromAllRelsInAllBlocks(LogicalPlan plan, ColumnReferenceExpr columnRef) throws VerifyException {
+
+ List<Column> candidates = Lists.newArrayList();
+
+ // from all relations of all query blocks
+ for (LogicalPlan.QueryBlock eachBlock : plan.getQueryBlocks()) {
+
+ for (RelationNode rel : eachBlock.getRelations()) {
+ Column found = rel.getTableSchema().getColumn(columnRef.getName());
+ if (found != null) {
+ candidates.add(found);
+ }
+ }
+ }
+
+ if (!candidates.isEmpty()) {
+ return NameResolver.ensureUniqueColumn(candidates);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Try to find a column from the final schema of the current block.
+ *
+ * @param block The current query block
+ * @param columnRef The column reference to be found
+ * @return The found column
+ */
+ static Column resolveAliasedName(LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef) throws VerifyException {
+ List<Column> candidates = Lists.newArrayList();
+
+ if (block.getSchema() != null) {
+ Column found = block.getSchema().getColumn(columnRef.getName());
+ if (found != null) {
+ candidates.add(found);
+ }
+ }
+
+ if (!candidates.isEmpty()) {
+ return NameResolver.ensureUniqueColumn(candidates);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * It returns a pair of names, which the first value is ${database}.${table} and the second value
+ * is a simple column name.
+ *
+ * @param block The current block
+ * @param columnRef The column name
+ * @return A pair of normalized qualifier and column name
+ * @throws PlanningException
+ */
+ static Pair<String, String> normalizeQualifierAndCanonicalName(LogicalPlan.QueryBlock block,
+ ColumnReferenceExpr columnRef)
+ throws PlanningException {
+ String qualifier;
+ String canonicalName;
+
+ if (CatalogUtil.isFQTableName(columnRef.getQualifier())) {
+ qualifier = columnRef.getQualifier();
+ canonicalName = columnRef.getCanonicalName();
+ } else {
+ String resolvedDatabaseName = resolveDatabase(block, columnRef.getQualifier());
+ if (resolvedDatabaseName == null) {
+ throw new NoSuchColumnException(columnRef.getQualifier());
+ }
+ qualifier = CatalogUtil.buildFQName(resolvedDatabaseName, columnRef.getQualifier());
+ canonicalName = CatalogUtil.buildFQName(qualifier, columnRef.getName());
+ }
+
+ return new Pair<String, String>(qualifier, canonicalName);
+ }
+
+ static Column ensureUniqueColumn(List<Column> candidates) throws VerifyException {
+ if (candidates.size() == 1) {
+ return candidates.get(0);
+ } else if (candidates.size() > 2) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Column column : candidates) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(column);
+ }
+ throw new AmbiguousFieldException("Ambiguous Column Name: " + sb.toString());
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java
new file mode 100644
index 0000000..4d9f9a5
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolvingMode.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+/**
+ *
+ * <h2>Motivation</h2>
+ *
+ * Please take a look at the following example query:
+ *
+ * <pre>
+ * select (l_orderkey + l_orderkey) l_orderkey from lineitem where l_orderkey > 2 order by l_orderkey;
+ * </pre>
+ *
+ * Although <code>l_orderkey</code> seems to be ambiguous, the above usages are available in commercial DBMSs.
+ * In order to eliminate the ambiguity, Tajo follows the behaviors of PostgreSQL.
+ *
+ * <h2>Resolving Modes</h2>
+ *
+ * From the behaviors of PostgreSQL, we found that there are three kinds of name resolving modes.
+ * Each definition is as follows:
+ *
+ * <ul>
+ * <li><b>RELS_ONLY</b> finds a column from the relations in the current block.
+ * <li><b>RELS_AND_SUBEXPRS</b> finds a column from the all relations in the current block and
+ * from aliased temporal fields; a temporal field means an explicitly aliased expression. If there are duplicated
+ * columns in the relation and temporal fields, this level firstly chooses the field in a relation.</li>
+ * <li><b>SUBEXPRS_AND_RELS</b> is very similar to <code>RELS_AND_SUBEXPRS</code>. The main difference is that it
+ * firstly chooses an aliased temporal field instead of the fields in a relation.</li>
+ * </ul>
+ *
+ * <h2>The relationship between resolving modes and operators</h3>
+ *
+ * <ul>
+ * <li>fields in select list and LIMIT are resolved in the REL_ONLY mode.</li>
+ * <li>fields in WHERE clause are resolved in the RELS_AND_SUBEXPRS mode.</li>
+ * <li>fields in GROUP BY, HAVING, and ORDER BY are resolved in the SUBEXPRS_AND_RELS mode.</li>
+ * </ul>
+ *
+ * <h2>Example</h2>
+ *
+ * Please revisit the aforementioned example:
+ *
+ * <pre>
+ * select (l_orderkey + l_orderkey) l_orderkey from lineitem where l_orderkey > 2 order by l_orderkey;
+ * </pre>
+ *
+ * With the above rules and the relationship between modes and operators, we can easily identify which reference
+ * points to which field.
+ * <ol>
+ * <li><code>l_orderkey</code> included in <code>(l_orderkey + l_orderkey)</code> points to the field
+ * in the relation <code>lineitem</code>.</li>
+ * <li><code>l_orderkey</code> included in WHERE clause also points to the field in the relation
+ * <code>lineitem</code>.</li>
+ * <li><code>l_orderkey</code> included in ORDER BY clause points to the temporal field
+ * <code>(l_orderkey + l_orderkey)</code>.</li>
+ * </ol>
+ */
+public enum NameResolvingMode {
+ RELS_ONLY, // finding from only relations
+ RELS_AND_SUBEXPRS, // finding from relations and subexprs in a place
+ SUBEXPRS_AND_RELS, // finding from subexprs and relations in a place
+ LEGACY // Finding in a legacy manner (globally)
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
new file mode 100644
index 0000000..a1d9dbd
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NoSuchColumnException;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.RelationNode;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+
+public class ResolverByLegacy extends NameResolver {
+ @Override
+ public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+ throws PlanningException {
+
+ if (columnRef.hasQualifier()) {
+ return resolveColumnWithQualifier(plan, block, columnRef);
+ } else {
+ return resolveColumnWithoutQualifier(plan, block, columnRef);
+ }
+ }
+
+ private static Column resolveColumnWithQualifier(LogicalPlan plan, LogicalPlan.QueryBlock block,
+ ColumnReferenceExpr columnRef) throws PlanningException {
+ final String qualifier;
+ String canonicalName;
+ final String qualifiedName;
+
+ Pair<String, String> normalized = normalizeQualifierAndCanonicalName(block, columnRef);
+ qualifier = normalized.getFirst();
+ canonicalName = normalized.getSecond();
+ qualifiedName = CatalogUtil.buildFQName(qualifier, columnRef.getName());
+
+ Column found = resolveFromRelsWithinBlock(plan, block, columnRef);
+ if (found == null) {
+ throw new NoSuchColumnException(columnRef.getCanonicalName());
+ }
+
+ // If code reach here, a column is found.
+ // But, it may be aliased from bottom logical node.
+ // If the column is aliased, the found name may not be used in upper node.
+
+ // Here, we try to check if column reference is already aliased.
+ // If so, it replaces the name with aliased name.
+ LogicalNode currentNode = block.getCurrentNode();
+
+ // The condition (currentNode.getInSchema().contains(column)) means
+ // the column can be used at the current node. So, we don't need to find aliase name.
+ Schema currentNodeSchema = null;
+ if (currentNode != null) {
+ if (currentNode instanceof RelationNode) {
+ currentNodeSchema = ((RelationNode) currentNode).getTableSchema();
+ } else {
+ currentNodeSchema = currentNode.getInSchema();
+ }
+ }
+
+ if (currentNode != null && !currentNodeSchema.contains(found)
+ && currentNode.getType() != NodeType.TABLE_SUBQUERY) {
+ List<Column> candidates = TUtil.newList();
+ if (block.getNamedExprsManager().isAliased(qualifiedName)) {
+ String alias = block.getNamedExprsManager().getAlias(canonicalName);
+ found = resolve(plan, block, new ColumnReferenceExpr(alias), NameResolvingMode.LEGACY);
+ if (found != null) {
+ candidates.add(found);
+ }
+ }
+ if (!candidates.isEmpty()) {
+ return ensureUniqueColumn(candidates);
+ }
+ }
+
+ return found;
+ }
+
+ static Column resolveColumnWithoutQualifier(LogicalPlan plan, LogicalPlan.QueryBlock block,
+ ColumnReferenceExpr columnRef)throws PlanningException {
+
+ Column found = resolveFromAllRelsInBlock(block, columnRef);
+ if (found != null) {
+ return found;
+ }
+
+ found = resolveAliasedName(block, columnRef);
+ if (found != null) {
+ return found;
+ }
+
+ found = resolveFromCurrentAndChildNode(block, columnRef);
+ if (found != null) {
+ return found;
+ }
+
+ found = resolveFromAllRelsInAllBlocks(plan, columnRef);
+ if (found != null) {
+ return found;
+ }
+
+ throw new NoSuchColumnException("ERROR: no such a column name "+ columnRef.getCanonicalName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java
new file mode 100644
index 0000000..a67a1ca
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRels.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.NoSuchColumnException;
+
+public class ResolverByRels extends NameResolver {
+ @Override
+ public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+ throws PlanningException {
+
+ Column column = resolveFromRelsWithinBlock(plan, block, columnRef);
+ if (column == null) {
+ throw new NoSuchColumnException(columnRef.getCanonicalName());
+ }
+ return column;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java
new file mode 100644
index 0000000..f486117
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByRelsAndSubExprs.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.exception.NoSuchColumnException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+public class ResolverByRelsAndSubExprs extends NameResolver {
+ @Override
+ public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+ throws PlanningException {
+
+ Column column = resolveFromRelsWithinBlock(plan, block, columnRef);
+ if (column == null) {
+ column = resolveFromCurrentAndChildNode(block, columnRef);
+ }
+
+ if (column == null) {
+ throw new NoSuchColumnException(columnRef.getCanonicalName());
+ }
+ return column;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java
new file mode 100644
index 0000000..3064fde
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverBySubExprsAndRels.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.nameresolver;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.NoSuchColumnException;
+
+public class ResolverBySubExprsAndRels extends NameResolver {
+ @Override
+ public Column resolve(LogicalPlan plan, LogicalPlan.QueryBlock block, ColumnReferenceExpr columnRef)
+ throws PlanningException {
+
+ Column column = resolveFromCurrentAndChildNode(block, columnRef);
+ if (column == null) {
+ column = resolveFromRelsWithinBlock(plan, block, columnRef);
+ }
+
+ if (column == null) {
+ throw new NoSuchColumnException(columnRef.getCanonicalName());
+ }
+ return column;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
new file mode 100644
index 0000000..491dda1
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is a basic query rewrite rule engine. This rewrite rule engine
+ * rewrites a logical plan with various query rewrite rules.
+ */
+public class BasicQueryRewriteEngine implements QueryRewriteEngine {
+ /** class logger */
+ private Log LOG = LogFactory.getLog(BasicQueryRewriteEngine.class);
+
+ /** a map for query rewrite rules */
+ private Map<String, RewriteRule> rewriteRules = new LinkedHashMap<String, RewriteRule>();
+
+ /**
+ * Add a query rewrite rule to this engine.
+ *
+ * @param rule The rule to be added to this engine.
+ */
+ public void addRewriteRule(RewriteRule rule) {
+ if (!rewriteRules.containsKey(rule.getName())) {
+ rewriteRules.put(rule.getName(), rule);
+ }
+ }
+
+ /**
+ * Rewrite a logical plan with all query rewrite rules added to this engine.
+ *
+ * @param plan The plan to be rewritten with all query rewrite rule.
+ * @return The rewritten plan.
+ */
+ public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+ RewriteRule rule;
+ for (Entry<String, RewriteRule> rewriteRule : rewriteRules.entrySet()) {
+ rule = rewriteRule.getValue();
+ if (rule.isEligible(plan)) {
+ plan = rule.rewrite(plan);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query.");
+ }
+ }
+ }
+
+ return plan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
new file mode 100644
index 0000000..b7f5637
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+public interface QueryRewriteEngine {
+ /**
+ * Rewrite a logical plan with all query rewrite rules added to this engine.
+ *
+ * @param plan The plan to be rewritten with all query rewrite rule.
+ * @return The rewritten plan.
+ */
+ LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java
new file mode 100644
index 0000000..0ba7460
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/RewriteRule.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+/**
+ * An interface for a rewrite rule.
+ */
+public interface RewriteRule {
+
+ /**
+ * It returns the rewrite rule name. It will be used for debugging and
+ * building a optimization history.
+ *
+ * @return The rewrite rule name
+ */
+ String getName();
+
+ /**
+ * This method checks if this rewrite rule can be applied to a given query plan.
+ * For example, the selection push down can not be applied to the query plan without any filter.
+ * In such case, it will return false.
+ *
+ * @param plan The plan to be checked
+ * @return True if this rule can be applied to a given plan. Otherwise, false.
+ */
+ boolean isEligible(LogicalPlan plan);
+
+ /**
+ * Updates a logical plan and returns an updated logical plan rewritten by this rule.
+ * It must be guaranteed that the input logical plan is not modified even after rewrite.
+ * In other words, the rewrite has to modify an plan copied from the input plan.
+ *
+ * @param plan Input logical plan. It will not be modified.
+ * @return The rewritten logical plan.
+ */
+ LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
new file mode 100644
index 0000000..31063bf
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
@@ -0,0 +1,912 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import com.google.common.collect.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * This rule tries to push down all filter conditions into logical nodes as lower as possible.
+ * It is likely to significantly reduces the intermediate data.
+ */
+public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownContext, LogicalNode>
+ implements RewriteRule {
+ private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class);
+ private static final String NAME = "FilterPushDown";
+
+ static class FilterPushDownContext {
+ Set<EvalNode> pushingDownFilters = new HashSet<EvalNode>();
+
+ public void clear() {
+ pushingDownFilters.clear();
+ }
+ public void setFiltersTobePushed(Collection<EvalNode> workingEvals) {
+ this.pushingDownFilters.clear();
+ this.pushingDownFilters.addAll(workingEvals);
+ }
+ public void addFiltersTobePushed(Collection<EvalNode> workingEvals) {
+ this.pushingDownFilters.addAll(workingEvals);
+ }
+
+ public void setToOrigin(Map<EvalNode, EvalNode> evalMap) {
+ //evalMap: copy -> origin
+ List<EvalNode> origins = new ArrayList<EvalNode>();
+ for (EvalNode eval : pushingDownFilters) {
+ EvalNode origin = evalMap.get(eval);
+ if (origin != null) {
+ origins.add(origin);
+ }
+ }
+ setFiltersTobePushed(origins);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public boolean isEligible(LogicalPlan plan) {
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+ /*
+ FilterPushDown rule: processing when visits each node
+ - If a target which is corresponding on a filter EvalNode's column is not FieldEval, do not PushDown.
+ - Replace filter EvalNode's column with child node's output column.
+ If there is no child node's output column, do not PushDown.
+ - When visit ScanNode, add filter eval to ScanNode's qual
+ - When visit GroupByNode, Find aggregation column in a filter EvalNode and
+ . If a parent is HavingNode, add filter eval to parent HavingNode.
+ . It not, create new HavingNode and set parent's child.
+ */
+ FilterPushDownContext context = new FilterPushDownContext();
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ context.clear();
+ this.visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("=============================================");
+ LOG.debug("FilterPushDown Optimized Query: \n" + plan.toString());
+ LOG.debug("=============================================");
+ }
+ return plan;
+ }
+
+ @Override
+ public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+ SelectionNode selNode, Stack<LogicalNode> stack) throws PlanningException {
+ context.pushingDownFilters.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
+
+ stack.push(selNode);
+ visit(context, plan, block, selNode.getChild(), stack);
+ stack.pop();
+
+ if(context.pushingDownFilters.size() == 0) {
+ // remove the selection operator if there is no search condition after selection push.
+ LogicalNode node = stack.peek();
+ if (node instanceof UnaryNode) {
+ UnaryNode unary = (UnaryNode) node;
+ unary.setChild(selNode.getChild());
+ } else {
+ throw new InvalidQueryException("Unexpected Logical Query Plan");
+ }
+ } else { // if there remain search conditions
+
+ // check if it can be evaluated here
+ Set<EvalNode> matched = TUtil.newHashSet();
+ for (EvalNode eachEval : context.pushingDownFilters) {
+ if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
+ matched.add(eachEval);
+ }
+ }
+
+ // if there are search conditions which can be evaluated here,
+ // push down them and remove them from context.pushingDownFilters.
+ if (matched.size() > 0) {
+ selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
+ context.pushingDownFilters.removeAll(matched);
+ }
+ }
+
+ return selNode;
+ }
+
+ @Override
+ public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
+ Stack<LogicalNode> stack) throws PlanningException {
+ // here we should stop selection pushdown on the null supplying side(s) of an outer join
+ // get the two operands of the join operation as well as the join type
+ JoinType joinType = joinNode.getJoinType();
+ EvalNode joinQual = joinNode.getJoinQual();
+ if (joinQual != null && LogicalPlanner.isOuterJoin(joinType)) {
+ BinaryEval binaryEval = (BinaryEval) joinQual;
+ // if both are fields
+ if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
+ binaryEval.getRightExpr().getType() == EvalType.FIELD) {
+
+ String leftTableName = ((FieldEval) binaryEval.getLeftExpr()).getQualifier();
+ String rightTableName = ((FieldEval) binaryEval.getRightExpr()).getQualifier();
+ List<String> nullSuppliers = Lists.newArrayList();
+ Set<String> leftTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
+ joinNode.getLeftChild()));
+ Set<String> rightTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
+ joinNode.getRightChild()));
+
+ // some verification
+ if (joinType == JoinType.FULL_OUTER) {
+ nullSuppliers.add(leftTableName);
+ nullSuppliers.add(rightTableName);
+
+ // verify that these null suppliers are indeed in the left and right sets
+ if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+ if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+
+ } else if (joinType == JoinType.LEFT_OUTER) {
+ nullSuppliers.add(((RelationNode)joinNode.getRightChild()).getCanonicalName());
+ //verify that this null supplier is indeed in the right sub-tree
+ if (!rightTableSet.contains(nullSuppliers.get(0))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+ } else if (joinType == JoinType.RIGHT_OUTER) {
+ if (((RelationNode)joinNode.getRightChild()).getCanonicalName().equals(rightTableName)) {
+ nullSuppliers.add(leftTableName);
+ } else {
+ nullSuppliers.add(rightTableName);
+ }
+
+ // verify that this null supplier is indeed in the left sub-tree
+ if (!leftTableSet.contains(nullSuppliers.get(0))) {
+ throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+ }
+ }
+ }
+ }
+
+ // get evals from ON clause
+ List<EvalNode> onConditions = new ArrayList<EvalNode>();
+ if (joinNode.hasJoinQual()) {
+ onConditions.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
+ }
+
+ boolean isTopMostJoin = stack.peek().getType() != NodeType.JOIN;
+
+ List<EvalNode> outerJoinPredicationEvals = new ArrayList<EvalNode>();
+ List<EvalNode> outerJoinFilterEvalsExcludePredication = new ArrayList<EvalNode>();
+ if (LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
+ // TAJO-853
+ // In the case of top most JOIN, all filters except JOIN condition aren't pushed down.
+ // That filters are processed by SELECTION NODE.
+ Set<String> nullSupplyingTableNameSet;
+ if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+ nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
+ } else {
+ nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
+ }
+
+ Set<String> preservedTableNameSet;
+ if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+ preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
+ } else {
+ preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
+ }
+
+ List<EvalNode> removedFromFilter = new ArrayList<EvalNode>();
+ for (EvalNode eachEval: context.pushingDownFilters) {
+ if (EvalTreeUtil.isJoinQual(block, eachEval, true)) {
+ outerJoinPredicationEvals.add(eachEval);
+ removedFromFilter.add(eachEval);
+ } else {
+ Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachEval);
+ boolean canPushDown = true;
+ for (Column eachColumn: columns) {
+ if (nullSupplyingTableNameSet.contains(eachColumn.getQualifier())) {
+ canPushDown = false;
+ break;
+ }
+ }
+ if (!canPushDown) {
+ outerJoinFilterEvalsExcludePredication.add(eachEval);
+ removedFromFilter.add(eachEval);
+ }
+ }
+ }
+
+ context.pushingDownFilters.removeAll(removedFromFilter);
+
+ for (EvalNode eachOnEval: onConditions) {
+ if (EvalTreeUtil.isJoinQual(eachOnEval, true)) {
+ // If join condition, processing in the JoinNode.
+ outerJoinPredicationEvals.add(eachOnEval);
+ } else {
+ // If Eval has a column which belong to Preserved Row table, not using to push down but using JoinCondition
+ Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachOnEval);
+ boolean canPushDown = true;
+ for (Column eachColumn: columns) {
+ if (preservedTableNameSet.contains(eachColumn.getQualifier())) {
+ canPushDown = false;
+ break;
+ }
+ }
+ if (canPushDown) {
+ context.pushingDownFilters.add(eachOnEval);
+ } else {
+ outerJoinPredicationEvals.add(eachOnEval);
+ }
+ }
+ }
+ } else {
+ context.pushingDownFilters.addAll(onConditions);
+ }
+
+ LogicalNode left = joinNode.getLeftChild();
+ LogicalNode right = joinNode.getRightChild();
+
+ List<EvalNode> notMatched = new ArrayList<EvalNode>();
+ // Join's input schema = right child output columns + left child output columns
+ Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, block, joinNode, left, notMatched,
+ null, true, 0);
+ context.setFiltersTobePushed(transformedMap.keySet());
+ visit(context, plan, block, left, stack);
+
+ context.setToOrigin(transformedMap);
+ context.addFiltersTobePushed(notMatched);
+
+ notMatched.clear();
+ transformedMap = findCanPushdownAndTransform(context, block, joinNode, right, notMatched, null, true,
+ left.getOutSchema().size());
+ context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+
+ visit(context, plan, block, right, stack);
+
+ context.setToOrigin(transformedMap);
+ context.addFiltersTobePushed(notMatched);
+
+ notMatched.clear();
+ List<EvalNode> matched = Lists.newArrayList();
+ if(LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
+ matched.addAll(outerJoinPredicationEvals);
+ } else {
+ for (EvalNode eval : context.pushingDownFilters) {
+ if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, eval, joinNode, isTopMostJoin)) {
+ matched.add(eval);
+ }
+ }
+ }
+
+ EvalNode qual = null;
+ if (matched.size() > 1) {
+ // merged into one eval tree
+ qual = AlgebraicUtil.createSingletonExprFromCNF(
+ matched.toArray(new EvalNode[matched.size()]));
+ } else if (matched.size() == 1) {
+ // if the number of matched expr is one
+ qual = matched.get(0);
+ }
+
+ if (qual != null) {
+ joinNode.setJoinQual(qual);
+
+ if (joinNode.getJoinType() == JoinType.CROSS) {
+ joinNode.setJoinType(JoinType.INNER);
+ }
+ context.pushingDownFilters.removeAll(matched);
+ }
+
+ context.pushingDownFilters.addAll(outerJoinFilterEvalsExcludePredication);
+ return joinNode;
+ }
+
+ private Map<EvalNode, EvalNode> transformEvalsWidthByPassNode(
+ Collection<EvalNode> originEvals, LogicalPlan plan,
+ LogicalPlan.QueryBlock block,
+ LogicalNode node, LogicalNode childNode) throws PlanningException {
+ // transformed -> pushingDownFilters
+ Map<EvalNode, EvalNode> transformedMap = new HashMap<EvalNode, EvalNode>();
+
+ if (originEvals.isEmpty()) {
+ return transformedMap;
+ }
+
+ if (node.getType() == NodeType.UNION) {
+ // If node is union, All eval's columns are simple name and matched with child's output schema.
+ Schema childOutSchema = childNode.getOutSchema();
+ for (EvalNode eval : originEvals) {
+ EvalNode copy;
+ try {
+ copy = (EvalNode) eval.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new PlanningException(e);
+ }
+
+ Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+ for (Column c : columns) {
+ Column column = childOutSchema.getColumn(c.getSimpleName());
+ if (column == null) {
+ throw new PlanningException(
+ "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
+ + c.getQualifiedName() + " for FilterPushDown(" + eval + "), " +
+ "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")");
+ }
+ EvalTreeUtil.changeColumnRef(copy, c.getSimpleName(), column.getQualifiedName());
+ }
+
+ transformedMap.put(copy, eval);
+ }
+ return transformedMap;
+ }
+
+ if (childNode.getType() == NodeType.UNION) {
+ // If child is union, remove qualifier from eval's column
+ for (EvalNode eval : originEvals) {
+ EvalNode copy;
+ try {
+ copy = (EvalNode) eval.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new PlanningException(e);
+ }
+
+ Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+ for (Column c : columns) {
+ if (c.hasQualifier()) {
+ EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), c.getSimpleName());
+ }
+ }
+
+ transformedMap.put(copy, eval);
+ }
+
+ return transformedMap;
+ }
+
+ // node in column -> child out column
+ Map<String, String> columnMap = new HashMap<String, String>();
+
+ for (int i = 0; i < node.getInSchema().size(); i++) {
+ String inColumnName = node.getInSchema().getColumn(i).getQualifiedName();
+ Column childOutColumn = childNode.getOutSchema().getColumn(i);
+ columnMap.put(inColumnName, childOutColumn.getQualifiedName());
+ }
+
+ // Rename from upper block's one to lower block's one
+ for (EvalNode matchedEval : originEvals) {
+ EvalNode copy;
+ try {
+ copy = (EvalNode) matchedEval.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new PlanningException(e);
+ }
+
+ Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+ boolean allMatched = true;
+ for (Column c : columns) {
+ if (columnMap.containsKey(c.getQualifiedName())) {
+ EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), columnMap.get(c.getQualifiedName()));
+ } else {
+ if (childNode.getType() == NodeType.GROUP_BY) {
+ if (((GroupbyNode) childNode).isAggregationColumn(c.getSimpleName())) {
+ allMatched = false;
+ break;
+ }
+ } else {
+ throw new PlanningException(
+ "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
+ + c.getQualifiedName() + " for FilterPushDown(" + matchedEval + "), " +
+ "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")"
+ );
+ }
+ }
+ }
+ if (allMatched) {
+ transformedMap.put(copy, matchedEval);
+ }
+ }
+
+ return transformedMap;
+ }
+
+ @Override
+ public LogicalNode visitTableSubQuery(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+ TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
+ List<EvalNode> matched = Lists.newArrayList();
+ for (EvalNode eval : context.pushingDownFilters) {
+ if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, node)) {
+ matched.add(eval);
+ }
+ }
+
+ // transformed -> pushingDownFilters
+ Map<EvalNode, EvalNode> transformedMap =
+ transformEvalsWidthByPassNode(matched, plan, block, node, node.getSubQuery());
+
+ context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+ visit(context, plan, plan.getBlock(node.getSubQuery()));
+ context.setToOrigin(transformedMap);
+
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitUnion(FilterPushDownContext context, LogicalPlan plan,
+ LogicalPlan.QueryBlock block, UnionNode unionNode,
+ Stack<LogicalNode> stack) throws PlanningException {
+ LogicalNode leftNode = unionNode.getLeftChild();
+
+ List<EvalNode> origins = new ArrayList<EvalNode>(context.pushingDownFilters);
+
+ // transformed -> pushingDownFilters
+ Map<EvalNode, EvalNode> transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, leftNode);
+ context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+ visit(context, plan, plan.getBlock(leftNode));
+
+ if (!context.pushingDownFilters.isEmpty()) {
+ errorFilterPushDown(plan, leftNode, context);
+ }
+
+ LogicalNode rightNode = unionNode.getRightChild();
+ transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, rightNode);
+ context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+ visit(context, plan, plan.getBlock(rightNode), rightNode, stack);
+
+ if (!context.pushingDownFilters.isEmpty()) {
+ errorFilterPushDown(plan, rightNode, context);
+ }
+
+ // notify all filter matched to upper
+ context.pushingDownFilters.clear();
+ return unionNode;
+ }
+
+ @Override
+ public LogicalNode visitProjection(FilterPushDownContext context,
+ LogicalPlan plan,
+ LogicalPlan.QueryBlock block,
+ ProjectionNode projectionNode,
+ Stack<LogicalNode> stack) throws PlanningException {
+ LogicalNode childNode = projectionNode.getChild();
+
+ List<EvalNode> notMatched = new ArrayList<EvalNode>();
+
+ //copy -> origin
+ BiMap<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(
+ context, block,projectionNode, childNode, notMatched, null, false, 0);
+
+ context.setFiltersTobePushed(transformedMap.keySet());
+
+ stack.push(projectionNode);
+ childNode = visit(context, plan, plan.getBlock(childNode), childNode, stack);
+ stack.pop();
+
+ // find not matched after visiting child
+ for (EvalNode eval: context.pushingDownFilters) {
+ notMatched.add(transformedMap.get(eval));
+ }
+
+ EvalNode qual = null;
+ if (notMatched.size() > 1) {
+ // merged into one eval tree
+ qual = AlgebraicUtil.createSingletonExprFromCNF(notMatched.toArray(new EvalNode[notMatched.size()]));
+ } else if (notMatched.size() == 1) {
+ // if the number of matched expr is one
+ qual = notMatched.get(0);
+ }
+
+ // If there is not matched node add SelectionNode and clear context.pushingDownFilters
+ if (qual != null && LogicalPlanner.checkIfBeEvaluatedAtThis(qual, projectionNode)) {
+ SelectionNode selectionNode = plan.createNode(SelectionNode.class);
+ selectionNode.setInSchema(childNode.getOutSchema());
+ selectionNode.setOutSchema(childNode.getOutSchema());
+ selectionNode.setQual(qual);
+ block.registerNode(selectionNode);
+
+ projectionNode.setChild(selectionNode);
+ selectionNode.setChild(childNode);
+
+ // clean all remain filters because all conditions are merged into a qual
+ context.pushingDownFilters.clear();
+ }
+
+ // if there are remain filters, recover the original names and give back to the upper query block.
+ if (context.pushingDownFilters.size() > 0) {
+ ImmutableSet<EvalNode> copy = ImmutableSet.copyOf(context.pushingDownFilters);
+ context.pushingDownFilters.clear();
+ context.pushingDownFilters.addAll(reverseTransform(transformedMap, copy));
+ }
+
+ return projectionNode;
+ }
+
+ private Collection<EvalNode> reverseTransform(BiMap<EvalNode, EvalNode> map, Set<EvalNode> remainFilters) {
+ Set<EvalNode> reversed = Sets.newHashSet();
+ for (EvalNode evalNode : remainFilters) {
+ reversed.add(map.get(evalNode));
+ }
+ return reversed;
+ }
+
+ private BiMap<EvalNode, EvalNode> findCanPushdownAndTransform(
+ FilterPushDownContext context, LogicalPlan.QueryBlock block, Projectable node,
+ LogicalNode childNode, List<EvalNode> notMatched,
+ Set<String> partitionColumns,
+ boolean ignoreJoin, int columnOffset) throws PlanningException {
+ // canonical name -> target
+ Map<String, Target> nodeTargetMap = new HashMap<String, Target>();
+ for (Target target : node.getTargets()) {
+ nodeTargetMap.put(target.getCanonicalName(), target);
+ }
+
+ // copy -> origin
+ BiMap<EvalNode, EvalNode> matched = HashBiMap.create();
+
+ for (EvalNode eval : context.pushingDownFilters) {
+ if (ignoreJoin && EvalTreeUtil.isJoinQual(block, eval, true)) {
+ notMatched.add(eval);
+ continue;
+ }
+ // If all column is field eval, can push down.
+ Set<Column> evalColumns = EvalTreeUtil.findUniqueColumns(eval);
+ boolean columnMatched = true;
+ for (Column c : evalColumns) {
+ Target target = nodeTargetMap.get(c.getQualifiedName());
+ if (target == null) {
+ columnMatched = false;
+ break;
+ }
+ if (target.getEvalTree().getType() != EvalType.FIELD) {
+ columnMatched = false;
+ break;
+ }
+ }
+
+ if (columnMatched) {
+ // transform eval column to child's output column
+ EvalNode copyEvalNode = transformEval(node, childNode, eval, nodeTargetMap, partitionColumns, columnOffset);
+ if (copyEvalNode != null) {
+ matched.put(copyEvalNode, eval);
+ } else {
+ notMatched.add(eval);
+ }
+ } else {
+ notMatched.add(eval);
+ }
+ }
+
+ return matched;
+ }
+
+ private EvalNode transformEval(Projectable node, LogicalNode childNode, EvalNode origin,
+ Map<String, Target> targetMap, Set<String> partitionColumns,
+ int columnOffset) throws PlanningException {
+ Schema outputSchema = childNode != null ? childNode.getOutSchema() : node.getInSchema();
+ EvalNode copy;
+ try {
+ copy = (EvalNode) origin.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new PlanningException(e);
+ }
+ Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+ for (Column c: columns) {
+ Target target = targetMap.get(c.getQualifiedName());
+ if (target == null) {
+ throw new PlanningException(
+ "Invalid Filter PushDown: No such a corresponding target '"
+ + c.getQualifiedName() + " for FilterPushDown(" + origin + "), " +
+ "(PID=" + node.getPID() + ")"
+ );
+ }
+ EvalNode targetEvalNode = target.getEvalTree();
+ if (targetEvalNode.getType() != EvalType.FIELD) {
+ throw new PlanningException(
+ "Invalid Filter PushDown: '" + c.getQualifiedName() + "' target is not FieldEval " +
+ "(PID=" + node.getPID() + ")"
+ );
+ }
+
+ FieldEval fieldEval = (FieldEval)targetEvalNode;
+ Column targetInputColumn = fieldEval.getColumnRef();
+
+ int index;
+ if (targetInputColumn.hasQualifier()) {
+ index = node.getInSchema().getColumnId(targetInputColumn.getQualifiedName());
+ } else {
+ index = node.getInSchema().getColumnIdByName(targetInputColumn.getQualifiedName());
+ }
+ if (columnOffset > 0) {
+ index = index - columnOffset;
+ }
+ if (index < 0 || index >= outputSchema.size()) {
+ if (partitionColumns != null && !partitionColumns.isEmpty() && node instanceof ScanNode) {
+ ScanNode scanNode = (ScanNode)node;
+ boolean isPartitionColumn = false;
+ if (CatalogUtil.isFQColumnName(partitionColumns.iterator().next())) {
+ isPartitionColumn = partitionColumns.contains(
+ CatalogUtil.buildFQName(scanNode.getTableName(), c.getSimpleName()));
+ } else {
+ isPartitionColumn = partitionColumns.contains(c.getSimpleName());
+ }
+ if (isPartitionColumn) {
+ EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(),
+ scanNode.getCanonicalName() + "." + c.getSimpleName());
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ } else {
+ Column outputColumn = outputSchema.getColumn(index);
+ EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), outputColumn.getQualifiedName());
+ }
+ }
+
+ return copy;
+ }
+
+ /**
+ * Find aggregation columns in filter eval and add having clause or add HavingNode.
+ * @param context
+ * @param plan
+ * @param block
+ * @param parentNode If null, having is parent
+ * @param havingNode If null, projection is parent
+ * @param groupByNode
+ * @return matched origin eval
+ * @throws PlanningException
+ */
+ private List<EvalNode> addHavingNode(FilterPushDownContext context, LogicalPlan plan,
+ LogicalPlan.QueryBlock block,
+ UnaryNode parentNode,
+ HavingNode havingNode,
+ GroupbyNode groupByNode) throws PlanningException {
+ // find aggregation column
+ Set<Column> groupingColumns = new HashSet<Column>(Arrays.asList(groupByNode.getGroupingColumns()));
+ Set<String> aggrFunctionOutColumns = new HashSet<String>();
+ for (Column column : groupByNode.getOutSchema().getColumns()) {
+ if (!groupingColumns.contains(column)) {
+ aggrFunctionOutColumns.add(column.getQualifiedName());
+ }
+ }
+
+ List<EvalNode> aggrEvalOrigins = new ArrayList<EvalNode>();
+ List<EvalNode> aggrEvals = new ArrayList<EvalNode>();
+
+ for (EvalNode eval : context.pushingDownFilters) {
+ EvalNode copy = null;
+ try {
+ copy = (EvalNode)eval.clone();
+ } catch (CloneNotSupportedException e) {
+ }
+ boolean isEvalAggrFunction = false;
+ for (Column evalColumn : EvalTreeUtil.findUniqueColumns(copy)) {
+ if (aggrFunctionOutColumns.contains(evalColumn.getSimpleName())) {
+ EvalTreeUtil.changeColumnRef(copy, evalColumn.getQualifiedName(), evalColumn.getSimpleName());
+ isEvalAggrFunction = true;
+ break;
+ }
+ }
+ if (isEvalAggrFunction) {
+ aggrEvals.add(copy);
+ aggrEvalOrigins.add(eval);
+ }
+ }
+
+ if (aggrEvals.isEmpty()) {
+ return aggrEvalOrigins;
+ }
+
+ // transform
+
+ HavingNode workingHavingNode;
+ if (havingNode != null) {
+ workingHavingNode = havingNode;
+ aggrEvals.add(havingNode.getQual());
+ } else {
+ workingHavingNode = plan.createNode(HavingNode.class);
+ block.registerNode(workingHavingNode);
+ parentNode.setChild(workingHavingNode);
+ workingHavingNode.setChild(groupByNode);
+ }
+
+ EvalNode qual = null;
+ if (aggrEvals.size() > 1) {
+ // merged into one eval tree
+ qual = AlgebraicUtil.createSingletonExprFromCNF(aggrEvals.toArray(new EvalNode[aggrEvals.size()]));
+ } else if (aggrEvals.size() == 1) {
+ // if the number of matched expr is one
+ qual = aggrEvals.get(0);
+ }
+
+ // If there is not matched node add SelectionNode and clear context.pushingDownFilters
+ if (qual != null) {
+ workingHavingNode.setQual(qual);
+ }
+
+ return aggrEvalOrigins;
+ }
+
+ @Override
+ public LogicalNode visitWindowAgg(FilterPushDownContext context, LogicalPlan plan,
+ LogicalPlan.QueryBlock block, WindowAggNode winAggNode,
+ Stack<LogicalNode> stack) throws PlanningException {
+ stack.push(winAggNode);
+ super.visitWindowAgg(context, plan, block, winAggNode, stack);
+ stack.pop();
+ return winAggNode;
+ }
+
+
+ @Override
+ public LogicalNode visitGroupBy(FilterPushDownContext context, LogicalPlan plan,
+ LogicalPlan.QueryBlock block, GroupbyNode groupbyNode,
+ Stack<LogicalNode> stack) throws PlanningException {
+ LogicalNode parentNode = stack.peek();
+ List<EvalNode> aggrEvals;
+ if (parentNode.getType() == NodeType.HAVING) {
+ aggrEvals = addHavingNode(context, plan, block, null, (HavingNode)parentNode, groupbyNode);
+ } else {
+ aggrEvals = addHavingNode(context, plan, block, (UnaryNode)parentNode, null, groupbyNode);
+ }
+
+ if (aggrEvals != null) {
+ // remove aggregation eval from conext
+ context.pushingDownFilters.removeAll(aggrEvals);
+ }
+
+ List<EvalNode> notMatched = new ArrayList<EvalNode>();
+ // transform
+ Map<EvalNode, EvalNode> tranformed =
+ findCanPushdownAndTransform(context, block, groupbyNode,groupbyNode.getChild(), notMatched, null, false, 0);
+
+ context.setFiltersTobePushed(tranformed.keySet());
+ LogicalNode current = super.visitGroupBy(context, plan, block, groupbyNode, stack);
+
+ context.setToOrigin(tranformed);
+ context.addFiltersTobePushed(notMatched);
+
+ return current;
+ }
+
+ @Override
+ public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
+ LogicalPlan.QueryBlock block, ScanNode scanNode,
+ Stack<LogicalNode> stack) throws PlanningException {
+ List<EvalNode> matched = Lists.newArrayList();
+
+ // find partition column and check matching
+ Set<String> partitionColumns = new HashSet<String>();
+ TableDesc table = scanNode.getTableDesc();
+ boolean hasQualifiedName = false;
+ if (table.hasPartition()) {
+ for (Column c: table.getPartitionMethod().getExpressionSchema().getColumns()) {
+ partitionColumns.add(c.getQualifiedName());
+ hasQualifiedName = c.hasQualifier();
+ }
+ }
+ Set<EvalNode> partitionEvals = new HashSet<EvalNode>();
+ for (EvalNode eval : context.pushingDownFilters) {
+ if (table.hasPartition()) {
+ Set<Column> columns = EvalTreeUtil.findUniqueColumns(eval);
+ if (columns.size() != 1) {
+ continue;
+ }
+ Column column = columns.iterator().next();
+
+ // If catalog runs with HCatalog, partition column is a qualified name
+ // Else partition column is a simple name
+ boolean isPartitionColumn = false;
+ if (hasQualifiedName) {
+ isPartitionColumn = partitionColumns.contains(CatalogUtil.buildFQName(table.getName(), column.getSimpleName()));
+ } else {
+ isPartitionColumn = partitionColumns.contains(column.getSimpleName());
+ }
+ if (isPartitionColumn) {
+ EvalNode copy;
+ try {
+ copy = (EvalNode) eval.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new PlanningException(e);
+ }
+ EvalTreeUtil.changeColumnRef(copy, column.getQualifiedName(),
+ scanNode.getCanonicalName() + "." + column.getSimpleName());
+ matched.add(copy);
+ partitionEvals.add(eval);
+ }
+ }
+ }
+
+ context.pushingDownFilters.removeAll(partitionEvals);
+
+ List<EvalNode> notMatched = new ArrayList<EvalNode>();
+
+ // transform
+ Map<EvalNode, EvalNode> transformed =
+ findCanPushdownAndTransform(context, block, scanNode, null, notMatched, partitionColumns, true, 0);
+
+ for (EvalNode eval : transformed.keySet()) {
+ if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {
+ matched.add(eval);
+ }
+ }
+
+ EvalNode qual = null;
+ if (matched.size() > 1) {
+ // merged into one eval tree
+ qual = AlgebraicUtil.createSingletonExprFromCNF(
+ matched.toArray(new EvalNode[matched.size()]));
+ } else if (matched.size() == 1) {
+ // if the number of matched expr is one
+ qual = matched.iterator().next();
+ }
+
+ if (qual != null) { // if a matched qual exists
+ scanNode.setQual(qual);
+ }
+
+ for (EvalNode matchedEval: matched) {
+ transformed.remove(matchedEval);
+ }
+
+ context.setToOrigin(transformed);
+ context.addFiltersTobePushed(notMatched);
+
+ return scanNode;
+ }
+
+ private void errorFilterPushDown(LogicalPlan plan, LogicalNode node,
+ FilterPushDownContext context) throws PlanningException {
+ String notMatchedNodeStr = "";
+ String prefix = "";
+ for (EvalNode notMatchedNode: context.pushingDownFilters) {
+ notMatchedNodeStr += prefix + notMatchedNode;
+ prefix = ", ";
+ }
+ throw new PlanningException("FilterPushDown failed cause some filters not matched: " + notMatchedNodeStr + "\n" +
+ "Error node: " + node.getPlanString() + "\n" +
+ plan.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
new file mode 100644
index 0000000..14b24a7
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.StringUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class PartitionedTableRewriter implements RewriteRule {
+ private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
+
+ private static final String NAME = "Partitioned Table Rewriter";
+ private final Rewriter rewriter = new Rewriter();
+
+ private final TajoConf systemConf;
+
+ public PartitionedTableRewriter(TajoConf conf) {
+ systemConf = conf;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public boolean isEligible(LogicalPlan plan) {
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ for (RelationNode relation : block.getRelations()) {
+ if (relation.getType() == NodeType.SCAN) {
+ TableDesc table = ((ScanNode)relation).getTableDesc();
+ if (table.hasPartition()) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+ LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
+ rewriter.visit(rootBlock, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
+ return plan;
+ }
+
+ private static class PartitionPathFilter implements PathFilter {
+
+ private Schema schema;
+ private EvalNode partitionFilter;
+ public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
+ this.schema = schema;
+ this.partitionFilter = partitionFilter;
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ Tuple tuple = buildTupleFromPartitionPath(schema, path, true);
+ if (tuple == null) { // if it is a file or not acceptable file
+ return false;
+ }
+
+ return partitionFilter.eval(schema, tuple).asBool();
+ }
+
+ @Override
+ public String toString() {
+ return partitionFilter.toString();
+ }
+ }
+
+ /**
+ * It assumes that each conjunctive form corresponds to one column.
+ *
+ * @param partitionColumns
+ * @param conjunctiveForms search condition corresponding to partition columns.
+ * If it is NULL, it means that there is no search condition for this table.
+ * @param tablePath
+ * @return
+ * @throws IOException
+ */
+ private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
+ throws IOException {
+
+ FileSystem fs = tablePath.getFileSystem(systemConf);
+
+ PathFilter [] filters;
+ if (conjunctiveForms == null) {
+ filters = buildAllAcceptingPathFilters(partitionColumns);
+ } else {
+ filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
+ }
+
+ // loop from one to the number of partition columns
+ Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
+
+ for (int i = 1; i < partitionColumns.size(); i++) {
+ // Get all file status matched to a ith level path filter.
+ filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
+ }
+
+ LOG.info("Filtered directory or files: " + filteredPaths.length);
+ return filteredPaths;
+ }
+
+ /**
+ * Build path filters for all levels with a list of filter conditions.
+ *
+ * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
+ * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
+ *
+ * Corresponding filter conditions will be placed on each path filter,
+ * If there is no corresponding expression for certain column,
+ * The condition will be filled with a true value.
+ *
+ * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
+ * There is no filter condition corresponding to col2.
+ * Then, the path filter conditions are corresponding to the followings:
+ *
+ * The first path filter: col1 = 'A'
+ * The second path filter: col1 = 'A' AND col2 IS NOT NULL
+ * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
+ *
+ * 'IS NOT NULL' predicate is always true against the partition path.
+ *
+ * @param partitionColumns
+ * @param conjunctiveForms
+ * @return
+ */
+ private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
+ EvalNode [] conjunctiveForms) {
+ // Building partition path filters for all levels
+ Column target;
+ PathFilter [] filters = new PathFilter[partitionColumns.size()];
+ List<EvalNode> accumulatedFilters = Lists.newArrayList();
+ for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
+ target = partitionColumns.getColumn(i);
+
+ for (EvalNode expr : conjunctiveForms) {
+ if (EvalTreeUtil.findUniqueColumns(expr).contains(target)) {
+ // Accumulate one qual per level
+ accumulatedFilters.add(expr);
+ }
+ }
+
+ if (accumulatedFilters.size() < (i + 1)) {
+ accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+ }
+
+ EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+ accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+ filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+ }
+ return filters;
+ }
+
+ /**
+ * Build an array of path filters for all levels with all accepting filter condition.
+ * @param partitionColumns The partition columns schema
+ * @return The array of path filter, accpeting all partition paths.
+ */
+ private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+ Column target;
+ PathFilter [] filters = new PathFilter[partitionColumns.size()];
+ List<EvalNode> accumulatedFilters = Lists.newArrayList();
+ for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
+ target = partitionColumns.getColumn(i);
+ accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+
+ EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+ accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+ filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+ }
+ return filters;
+ }
+
+ private static Path [] toPathArray(FileStatus[] fileStatuses) {
+ Path [] paths = new Path[fileStatuses.length];
+ for (int j = 0; j < fileStatuses.length; j++) {
+ paths[j] = fileStatuses[j].getPath();
+ }
+ return paths;
+ }
+
+ private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
+ TableDesc table = scanNode.getTableDesc();
+ PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
+
+ Schema paritionValuesSchema = new Schema();
+ for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
+ paritionValuesSchema.addColumn(column);
+ }
+
+ Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+
+ // if a query statement has a search condition, try to find indexable predicates
+ if (scanNode.hasQual()) {
+ EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
+ Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
+
+ // add qualifier to schema for qual
+ paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
+ for (Column column : paritionValuesSchema.getColumns()) {
+ for (EvalNode simpleExpr : conjunctiveForms) {
+ if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
+ indexablePredicateSet.add(simpleExpr);
+ }
+ }
+ }
+
+ // Partitions which are not matched to the partition filter conditions are pruned immediately.
+ // So, the partition filter conditions are not necessary later, and they are removed from
+ // original search condition for simplicity and efficiency.
+ remainExprs.removeAll(indexablePredicateSet);
+ if (remainExprs.isEmpty()) {
+ scanNode.setQual(null);
+ } else {
+ scanNode.setQual(
+ AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
+ }
+ }
+
+ if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
+ return findFilteredPaths(paritionValuesSchema,
+ indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
+ } else { // otherwise, we will get all partition paths.
+ return findFilteredPaths(paritionValuesSchema, null, table.getPath());
+ }
+ }
+
+ private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+ if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
+ Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
+ // if it contains only single variable matched to a target column
+ return variables.size() == 1 && variables.contains(targetColumn);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Check if an expression consists of one variable and one constant and
+ * the expression is a comparison operator.
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an expression consists of one variable and one constant
+ * and the expression is a comparison operator. Other, false.
+ */
+ private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+ // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
+ return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
+ }
+
+ /**
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an disjunctive expression, consisting of indexable expressions
+ */
+ private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
+ if (evalNode.getType() == EvalType.OR) {
+ BinaryEval orEval = (BinaryEval) evalNode;
+ boolean indexable =
+ checkIfIndexablePredicate(orEval.getLeftExpr()) &&
+ checkIfIndexablePredicate(orEval.getRightExpr());
+
+ boolean sameVariable =
+ EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr())
+ .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr()));
+
+ return indexable && sameVariable;
+ } else {
+ return false;
+ }
+ }
+
+ private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
+ if (scanNode.getInputPaths().length > 0) {
+ try {
+ FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
+ long totalVolume = 0;
+
+ for (Path input : scanNode.getInputPaths()) {
+ ContentSummary summary = fs.getContentSummary(input);
+ totalVolume += summary.getLength();
+ totalVolume += summary.getFileCount();
+ }
+ scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
+ } catch (IOException e) {
+ throw new PlanningException(e);
+ }
+ }
+ }
+
+ /**
+ * Take a look at a column partition path. A partition path consists
+ * of a table path part and column values part. This method transforms
+ * a partition path into a tuple with a given partition column schema.
+ *
+ * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
+ * ^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^
+ * table path part column values part
+ *
+ * When a file path is given, it can perform two ways depending on beNullIfFile flag.
+ * If it is true, it returns NULL when a given path is a file.
+ * Otherwise, it returns a built tuple regardless of file or directory.
+ *
+ * @param partitionColumnSchema The partition column schema
+ * @param partitionPath The partition path
+ * @param beNullIfFile If true, this method returns NULL when a given path is a file.
+ * @return The tuple transformed from a column values part.
+ */
+ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
+ boolean beNullIfFile) {
+ int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
+
+ if (startIdx == -1) { // if there is no partition column in the patch
+ return null;
+ }
+ String columnValuesPart = partitionPath.toString().substring(startIdx);
+
+ String [] columnValues = columnValuesPart.split("/");
+
+ // true means this is a file.
+ if (beNullIfFile && partitionColumnSchema.size() < columnValues.length) {
+ return null;
+ }
+
+ Tuple tuple = new VTuple(partitionColumnSchema.size());
+ int i = 0;
+ for (; i < columnValues.length && i < partitionColumnSchema.size(); i++) {
+ String [] parts = columnValues[i].split("=");
+ if (parts.length != 2) {
+ return null;
+ }
+ int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
+ Column keyColumn = partitionColumnSchema.getColumn(columnId);
+ tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), StringUtils.unescapePathName(parts[1])));
+ }
+ for (; i < partitionColumnSchema.size(); i++) {
+ tuple.put(i, NullDatum.get());
+ }
+ return tuple;
+ }
+
+ /**
+ * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
+ * Then, you will get a string 'col1='.
+ *
+ * @param partitionColumn the schema of column partition
+ * @return The first part string of column partition path.
+ */
+ private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(partitionColumn.getColumn(0).getSimpleName()).append("=");
+ return sb.toString();
+ }
+
+ private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> {
+ @Override
+ public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode,
+ Stack<LogicalNode> stack) throws PlanningException {
+
+ TableDesc table = scanNode.getTableDesc();
+ if (!table.hasPartition()) {
+ return null;
+ }
+
+ try {
+ Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
+ plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
+ PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class);
+ rewrittenScanNode.init(scanNode, filteredPaths);
+ updateTableStat(rewrittenScanNode);
+
+ // if it is topmost node, set it as the rootnode of this block.
+ if (stack.empty() || block.getRoot().equals(scanNode)) {
+ block.setRoot(rewrittenScanNode);
+ } else {
+ PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
+ }
+ } catch (IOException e) {
+ throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
+ }
+ return null;
+ }
+ }
+}