You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/12/19 04:10:34 UTC
[2/3] TAJO-338 - Add Query Optimization Part for Column-Partitioned
Tables. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 638c92a..d262bc3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.eval;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -31,8 +30,8 @@ import org.apache.tajo.exception.InternalException;
import java.util.*;
public class EvalTreeUtil {
- public static void changeColumnRef(EvalNode node, String oldName,
- String newName) {
+
+ public static void changeColumnRef(EvalNode node, String oldName, String newName) {
node.postOrder(new ChangeColumnRefVisitor(oldName, newName));
}
@@ -80,50 +79,9 @@ public class EvalTreeUtil {
node.postOrder(finder);
return finder.getColumnRefs();
}
-
- /**
- * Convert a list of conjunctive normal forms into a singleton expression.
- *
- * @param cnfExprs
- * @return The EvalNode object that merges all CNF-formed expressions.
- */
- public static EvalNode transformCNF2Singleton(EvalNode...cnfExprs) {
- if (cnfExprs.length == 1) {
- return cnfExprs[0];
- }
-
- return transformCNF2Singleton_(cnfExprs, 0);
- }
-
- private static EvalNode transformCNF2Singleton_(EvalNode [] evalNode, int idx) {
- if (idx == evalNode.length - 2) {
- return new BinaryEval(EvalType.AND, evalNode[idx], evalNode[idx + 1]);
- } else {
- return new BinaryEval(EvalType.AND, evalNode[idx],
- transformCNF2Singleton_(evalNode, idx + 1));
- }
- }
-
- /**
- * Get a list of exprs similar to CNF
- *
- * @param expr The expression to be transformed to an array of CNF-formed expressions.
- * @return An array of CNF-formed expressions
- */
- public static EvalNode [] getConjNormalForm(EvalNode expr) {
- List<EvalNode> list = new ArrayList<EvalNode>();
- getConjNormalForm(expr, list);
- return list.toArray(new EvalNode[list.size()]);
- }
-
- private static void getConjNormalForm(EvalNode node, List<EvalNode> found) {
- if (node.getType() == EvalType.AND) {
- getConjNormalForm(node.getLeftExpr(), found);
- getConjNormalForm(node.getRightExpr(), found);
- } else {
- found.add(node);
- }
- }
+
+
+
public static Schema getSchemaByTargets(Schema inputSchema, Target [] targets)
throws InternalException {
@@ -234,16 +192,8 @@ public class EvalTreeUtil {
}
}
- public static boolean isComparisonOperator(EvalNode expr) {
- return expr.getType() == EvalType.EQUAL ||
- expr.getType() == EvalType.LEQ ||
- expr.getType() == EvalType.LTH ||
- expr.getType() == EvalType.GEQ ||
- expr.getType() == EvalType.GTH;
- }
-
public static boolean isJoinQual(EvalNode expr) {
- return isComparisonOperator(expr) &&
+ return AlgebraicUtil.isComparisonOperator(expr) &&
expr.getLeftExpr().getType() == EvalType.FIELD &&
expr.getRightExpr().getType() == EvalType.FIELD;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
index e2411e3..59e8b31 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
@@ -71,7 +71,7 @@ public class InEval extends BinaryEval {
Datum value = tuple.get(fieldId);
for (Datum datum : values) {
- if (value.equals(datum)) {
+ if (value.equalsTo(datum).asBool()) {
isIncluded = true;
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
index ac7aeeb..e1c693e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
@@ -42,6 +42,10 @@ public class LikePredicateEval extends PatternMatchPredicateEval {
this.compiled = Pattern.compile(regex, flags);
}
+ public boolean isLeadingWildCard() {
+ return pattern.indexOf(".*") == 0;
+ }
+
@Override
public String toString() {
return leftExpr.toString() + (caseInsensitive ? "ILIKE" : "LIKE") + "'" + pattern +"'";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 2cd91f9..ad8acf1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -906,9 +906,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
}
if (checkIfExist(ctx.table_partitioning_clauses())) {
- CreateTable.PartitionOption partitionOption =
+ PartitionDescExpr partitionDesc =
parseTablePartitioningClause(ctx.table_partitioning_clauses());
- createTable.setPartition(partitionOption);
+ createTable.setPartition(partitionDesc);
}
return createTable;
}
@@ -925,7 +925,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
return elements;
}
- public CreateTable.PartitionOption parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
+ public PartitionDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
if (checkIfExist(ctx.range_partitions())) { // For Range Partition
Range_partitionsContext rangePartitionsContext = ctx.range_partitions();
@@ -978,9 +978,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
return new ListPartition(getColumnReferences(ctx.list_partitions().column_reference_list()), specifiers);
} else if (checkIfExist(ctx.column_partitions())) { // For Column Partition (Hive Style)
- return new CreateTable.ColumnPartition(getColumnReferences(ctx.column_partitions().column_reference_list()));
+ return new CreateTable.ColumnPartition(getDefinitions(ctx.column_partitions().table_elements()), true);
} else {
- throw new SQLSyntaxError("Wrong partition option");
+ throw new SQLSyntaxError("Invalid Partition Type: " + ctx.toStringTree());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 9478648..0cee8dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -92,6 +92,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
case SCAN:
current = visitScan(context, plan, (ScanNode) node, stack);
break;
+ case PARTITIONS_SCAN:
+ current = visitScan(context, plan, (ScanNode) node, stack);
+ break;
case STORE:
current = visitStoreTable(context, plan, (StoreTableNode) node, stack);
break;
@@ -221,6 +224,11 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
}
@Override
+ public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node, Stack<LogicalNode> stack) throws PlanningException {
+ return null;
+ }
+
+ @Override
public RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
throws PlanningException {
stack.push(node);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index 640383e..5f11f1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.logical.join.FoundJoinOrder;
@@ -32,6 +33,7 @@ import org.apache.tajo.engine.planner.logical.join.JoinGraph;
import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm;
import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
+import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter;
import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
import java.util.Set;
@@ -49,12 +51,13 @@ public class LogicalOptimizer {
private BasicQueryRewriteEngine rulesAfterToJoinOpt;
private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
- public LogicalOptimizer() {
+ public LogicalOptimizer(TajoConf systemConf) {
rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
rulesAfterToJoinOpt = new BasicQueryRewriteEngine();
rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule());
+ rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf));
}
public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
@@ -130,7 +133,7 @@ public class LogicalOptimizer {
public LogicalNode visitFilter(JoinGraphContext context, LogicalPlan plan, SelectionNode node,
Stack<LogicalNode> stack) throws PlanningException {
super.visitFilter(context, plan, node, stack);
- context.quals.addAll(Lists.newArrayList(EvalTreeUtil.getConjNormalForm(node.getQual())));
+ context.quals.addAll(Lists.newArrayList(AlgebraicUtil.toConjunctiveNormalFormArray(node.getQual())));
return node;
}
@@ -239,7 +242,7 @@ public class LogicalOptimizer {
double filterFactor = 1;
if (joinNode.hasJoinQual()) {
- EvalNode [] quals = EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual());
+ EvalNode [] quals = AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual());
filterFactor = Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, quals.length);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index c06b7a7..c6fcefc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -47,6 +47,8 @@ public interface LogicalPlanVisitor <CONTEXT, RESULT> {
throws PlanningException;
RESULT visitScan(CONTEXT context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
throws PlanningException;
+ RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node,
+ Stack<LogicalNode> stack) throws PlanningException;
RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
throws PlanningException;
RESULT visitInsert(CONTEXT context, LogicalPlan plan, InsertNode node, Stack<LogicalNode> stack)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 63b7985..605b9df 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.*;
import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
@@ -50,10 +50,13 @@ import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.util.TUtil;
+import java.util.Collection;
import java.util.List;
import java.util.Stack;
import static org.apache.tajo.algebra.Aggregation.GroupType;
+import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
+import static org.apache.tajo.algebra.CreateTable.PartitionType;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
@@ -407,7 +410,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// 3. build this plan:
EvalNode searchCondition = createEvalTree(plan, block, selection.getQual());
- SelectionNode selectionNode = new SelectionNode(plan.newPID(), searchCondition);
+ EvalNode simplified = AlgebraicUtil.eliminateConstantExprs(searchCondition);
+ SelectionNode selectionNode = new SelectionNode(plan.newPID(), simplified);
// 4. set child plan, update input/output schemas:
selectionNode.setChild(child);
@@ -772,8 +776,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return storeNode;
} else {
- CreateTableNode createTableNode = new CreateTableNode(context.plan.newPID(), expr.getTableName(),
- convertTableElementsSchema(expr.getTableElements()));
+
+ Schema tableSchema;
+ if (expr.hasPartition() && expr.getPartition().getPartitionType() == PartitionType.COLUMN &&
+ ((ColumnPartition)expr.getPartition()).isOmitValues()) {
+ ColumnDefinition [] merged = TUtil.concat(expr.getTableElements(),
+ ((ColumnPartition)expr.getPartition()).getColumns());
+ tableSchema = convertTableElementsSchema(merged);
+ } else {
+ tableSchema = convertTableElementsSchema(expr.getTableElements());
+ }
+
+ CreateTableNode createTableNode = new CreateTableNode(
+ context.plan.newPID(),
+ expr.getTableName(),
+ tableSchema);
if (expr.isExternal()) {
createTableNode.setExternal(true);
@@ -811,28 +828,28 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
* @return
* @throws PlanningException
*/
- private Partitions convertTableElementsPartition(PlanContext context,
+ private PartitionDesc convertTableElementsPartition(PlanContext context,
CreateTable expr) throws PlanningException {
Schema schema = convertTableElementsSchema(expr.getTableElements());
- Partitions partitions = null;
+ PartitionDesc partitionDesc = null;
List<Specifier> specifiers = null;
if (expr.hasPartition()) {
- partitions = new Partitions();
+ partitionDesc = new PartitionDesc();
specifiers = TUtil.newList();
- partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
.getPartitionType().name()));
- if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.HASH)) {
+ if (expr.getPartition().getPartitionType().equals(PartitionType.HASH)) {
CreateTable.HashPartition hashPartition = expr.getPartition();
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
, hashPartition.getColumns()));
if (hashPartition.getColumns() != null) {
if (hashPartition.getQuantifier() != null) {
String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
- partitions.setNumPartitions(Integer.parseInt(quantity));
+ partitionDesc.setNumPartitions(Integer.parseInt(quantity));
}
if (hashPartition.getSpecifiers() != null) {
@@ -841,21 +858,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
}
- if (specifiers.isEmpty() && partitions.getNumPartitions() > 0) {
- for (int i = 0; i < partitions.getNumPartitions(); i++) {
- String partitionName = partitions.getPartitionsType().name() + "_" + expr
+ if (specifiers.isEmpty() && partitionDesc.getNumPartitions() > 0) {
+ for (int i = 0; i < partitionDesc.getNumPartitions(); i++) {
+ String partitionName = partitionDesc.getPartitionsType().name() + "_" + expr
.getTableName() + "_" + i;
specifiers.add(new Specifier(partitionName));
}
}
if (!specifiers.isEmpty())
- partitions.setSpecifiers(specifiers);
+ partitionDesc.setSpecifiers(specifiers);
}
- } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.LIST)) {
+ } else if (expr.getPartition().getPartitionType().equals(PartitionType.LIST)) {
CreateTable.ListPartition listPartition = expr.getPartition();
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
, listPartition.getColumns()));
if (listPartition.getSpecifiers() != null) {
@@ -876,12 +893,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
specifiers.add(specifier);
}
if (!specifiers.isEmpty())
- partitions.setSpecifiers(specifiers);
+ partitionDesc.setSpecifiers(specifiers);
}
- } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.RANGE)) {
+ } else if (expr.getPartition().getPartitionType().equals(PartitionType.RANGE)) {
CreateTable.RangePartition rangePartition = expr.getPartition();
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
, rangePartition.getColumns()));
if (rangePartition.getSpecifiers() != null) {
@@ -903,17 +920,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
specifiers.add(specifier);
}
if (!specifiers.isEmpty())
- partitions.setSpecifiers(specifiers);
+ partitionDesc.setSpecifiers(specifiers);
}
- } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.COLUMN)) {
- CreateTable.ColumnPartition columnPartition = expr.getPartition();
-
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
- , columnPartition.getColumns()));
+ } else if (expr.getPartition().getPartitionType() == PartitionType.COLUMN) {
+ ColumnPartition columnPartition = expr.getPartition();
+ partitionDesc.setColumns(convertTableElementsSchema(columnPartition.getColumns()).getColumns());
+ partitionDesc.setOmitValues(columnPartition.isOmitValues());
}
}
- return partitions;
+ return partitionDesc;
}
@@ -933,7 +949,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return schema;
}
- private List<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
+ private Collection<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
ColumnReferenceExpr[] references) {
List<Column> columnList = TUtil.newList();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index bbecd50..73395a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -138,7 +138,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
return leftExec;
- } case SCAN:
+ }
+ case PARTITIONS_SCAN:
+ case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
return leftExec;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 6b23ed8..d541680 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -545,7 +545,7 @@ public class PlannerUtil {
* @return true if two operands refers to columns and the operator is comparison,
*/
public static boolean isJoinQual(EvalNode qual) {
- if (EvalTreeUtil.isComparisonOperator(qual)) {
+ if (AlgebraicUtil.isComparisonOperator(qual)) {
List<Column> left = EvalTreeUtil.findAllColumnRefs(qual.getLeftExpr());
List<Column> right = EvalTreeUtil.findAllColumnRefs(qual.getRightExpr());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
index a49451c..ac50c46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -48,6 +48,10 @@ public class Target implements Cloneable, GsonObject {
return !hasAlias() ? column.getQualifiedName() : alias;
}
+ public final void setExpr(EvalNode expr) {
+ this.expr = expr;
+ }
+
public final void setAlias(String alias) {
this.alias = alias;
this.column = new Column(alias, expr.getValueType());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 4f936fc..4f3976e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -53,6 +53,10 @@ public class ExecutionBlock {
this.scanlist.clear();
this.plan = plan;
+ if (plan == null) {
+ return;
+ }
+
LogicalNode node = plan;
ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
s.add(node);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 6ef35ce..abf5620 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -112,7 +112,7 @@ public class GlobalPlanner {
}
private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
- ExecutionBlock leftBlock, ExecutionBlock rightBlock)
+ ExecutionBlock leftBlock, ExecutionBlock rightBlock)
throws PlanningException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 942309d..d0c8373 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
@@ -36,7 +36,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
@Expose private Path path;
@Expose private Options options;
@Expose private boolean external;
- @Expose private Partitions partitions;
+ @Expose private PartitionDesc partitionDesc;
public CreateTableNode(int pid, String tableName, Schema schema) {
super(pid, NodeType.CREATE_TABLE);
@@ -92,16 +92,16 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
this.external = external;
}
- public Partitions getPartitions() {
- return partitions;
+ public PartitionDesc getPartitions() {
+ return partitionDesc;
}
- public void setPartitions(Partitions partitions) {
- this.partitions = partitions;
+ public void setPartitions(PartitionDesc partitionDesc) {
+ this.partitionDesc = partitionDesc;
}
public boolean hasPartition() {
- return this.partitions != null;
+ return this.partitionDesc != null;
}
@Override
@@ -121,7 +121,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
&& TUtil.checkEquals(path, other.path)
&& TUtil.checkEquals(options, other.options)
&& TUtil.checkEquals(partitionKeys, other.partitionKeys)
- && TUtil.checkEquals(partitions, other.partitions);
+ && TUtil.checkEquals(partitionDesc, other.partitionDesc);
} else {
return false;
}
@@ -137,7 +137,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
store.path = path != null ? new Path(path.toString()) : null;
store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
store.options = (Options) (options != null ? options.clone() : null);
- store.partitions = (Partitions) (partitions != null ? partitions.clone() : null);
+ store.partitionDesc = (PartitionDesc) (partitionDesc != null ? partitionDesc.clone() : null);
return store;
}
@@ -157,7 +157,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
sb.append(",\"storeType\": \"" + this.storageType);
sb.append(",\"path\" : \"" + this.path).append("\",");
sb.append(",\"external\" : \"" + this.external).append("\",");
- sb.append(",\"partitions\" : \"" + this.partitions).append("\",");
+ sb.append(",\"partitions\" : \"" + this.partitionDesc).append("\",");
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
.append("\n \"in schema\": ").append(getInSchema())
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 44790ec..eaaf0c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -37,6 +37,7 @@ public enum NodeType {
INTERSECT(IntersectNode.class),
LIMIT(LimitNode.class),
JOIN(JoinNode.class),
+ PARTITIONS_SCAN(PartitionedTableScanNode.class),
PROJECTION(ProjectionNode.class),
ROOT(LogicalRootNode.class),
SCAN(ScanNode.class),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
new file mode 100644
index 0000000..21f5ef4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.util.TUtil;
+
+public class PartitionedTableScanNode extends ScanNode {
+ @Expose Path [] inputPaths;
+
+ public PartitionedTableScanNode(int pid, ScanNode scanNode, Path[] inputPaths) {
+ super(pid, NodeType.PARTITIONS_SCAN, scanNode.getTableDesc());
+ this.setInSchema(scanNode.getInSchema());
+ this.setOutSchema(scanNode.getOutSchema());
+ this.alias = scanNode.alias;
+ this.renamedSchema = scanNode.renamedSchema;
+ this.qual = scanNode.qual;
+ this.targets = scanNode.targets;
+ this.inputPaths = inputPaths;
+ }
+
+ public void setInputPaths(Path [] paths) {
+ this.inputPaths = paths;
+ }
+
+ public Path [] getInputPaths() {
+ return inputPaths;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\"Partitions Scan\" : {\"table\":\"")
+ .append(getTableName()).append("\"");
+ if (hasAlias()) {
+ sb.append(",\"alias\": \"").append(alias);
+ }
+
+ if (hasQual()) {
+ sb.append(", \"qual\": \"").append(this.qual).append("\"");
+ }
+
+ if (hasTargets()) {
+ sb.append(", \"target list\": ");
+ boolean first = true;
+ for (Target target : targets) {
+ if (!first) {
+ sb.append(", ");
+ }
+ sb.append(target);
+ first = false;
+ }
+ }
+
+ if (inputPaths != null) {
+ sb.append(", \"Partition paths\": ");
+ for (Path path : inputPaths) {
+ sb.append("\n ");
+ sb.append(path);
+ }
+ sb.append("\n");
+ }
+
+ sb.append(",");
+ sb.append("\n \"out schema\": ").append(getOutSchema());
+ sb.append("\n \"in schema\": ").append(getInSchema());
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(this.tableDesc, this.qual, this.targets);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PartitionedTableScanNode) {
+ PartitionedTableScanNode other = (PartitionedTableScanNode) obj;
+
+ boolean eq = super.equals(other);
+ eq = eq && TUtil.checkEquals(this.tableDesc, other.tableDesc);
+ eq = eq && TUtil.checkEquals(this.qual, other.qual);
+ eq = eq && TUtil.checkEquals(this.targets, other.targets);
+ eq = eq && TUtil.checkEquals(this.inputPaths, other.inputPaths);
+
+ return eq;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ PartitionedTableScanNode unionScan = (PartitionedTableScanNode) super.clone();
+
+ unionScan.tableDesc = (TableDesc) this.tableDesc.clone();
+
+ if (hasQual()) {
+ unionScan.qual = (EvalNode) this.qual.clone();
+ }
+
+ if (hasTargets()) {
+ unionScan.targets = new Target[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ unionScan.targets[i] = (Target) targets[i].clone();
+ }
+ }
+
+ unionScan.inputPaths = inputPaths;
+
+ return unionScan;
+ }
+
+ @Override
+ public void preOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ public void postOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString("Scan on ").appendTitle(getTableName());
+ if (hasAlias()) {
+ planStr.appendTitle(" as ").appendTitle(alias);
+ }
+
+ if (hasQual()) {
+ planStr.addExplan("filter: ").appendExplain(this.qual.toString());
+ }
+
+ if (hasTargets()) {
+ planStr.addExplan("target list: ");
+ boolean first = true;
+ for (Target target : targets) {
+ if (!first) {
+ planStr.appendExplain(", ");
+ }
+ planStr.appendExplain(target.toString());
+ first = false;
+ }
+ }
+
+ if (inputPaths != null) {
+ planStr.addExplan("Path list: ");
+ int i = 0;
+ for (Path path : inputPaths) {
+ planStr.addExplan((i++) + ": ").appendExplain(path.toString());
+ }
+ }
+
+ planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString());
+ planStr.addDetail("in schema: ").appendDetail(getInSchema().toString());
+
+ return planStr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
index 6a30aa6..5ea61b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
@@ -24,7 +24,7 @@ public abstract class RelationNode extends LogicalNode {
public RelationNode(int pid, NodeType nodeType) {
super(pid, nodeType);
- assert(nodeType == NodeType.SCAN || nodeType == NodeType.TABLE_SUBQUERY);
+ assert(nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.TABLE_SUBQUERY);
}
public abstract String getTableName();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 04c7b5a..cd9c1f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -29,11 +29,16 @@ import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.util.TUtil;
public class ScanNode extends RelationNode implements Projectable {
- @Expose private TableDesc tableDesc;
- @Expose private String alias;
- @Expose private Schema renamedSchema;
- @Expose private EvalNode qual;
- @Expose private Target[] targets;
+ @Expose protected TableDesc tableDesc;
+ @Expose protected String alias;
+ @Expose protected Schema renamedSchema;
+ @Expose protected EvalNode qual;
+ @Expose protected Target[] targets;
+
+ protected ScanNode(int pid, NodeType nodeType, TableDesc desc) {
+ super(pid, nodeType);
+ this.tableDesc = desc;
+ }
public ScanNode(int pid, TableDesc desc) {
super(pid, NodeType.SCAN);
@@ -46,7 +51,7 @@ public class ScanNode extends RelationNode implements Projectable {
this(pid, desc);
this.alias = PlannerUtil.normalizeTableName(alias);
renamedSchema = getOutSchema();
- renamedSchema.setQualifier(this.alias, true);
+ renamedSchema.setQualifier(this.alias);
}
public String getTableName() {
@@ -119,7 +124,7 @@ public class ScanNode extends RelationNode implements Projectable {
first = false;
}
}
-
+
sb.append(",");
sb.append("\n \"out schema\": ").append(getOutSchema());
sb.append("\n \"in schema\": ").append(getInSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index b2bd937..634fa3a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
@@ -39,17 +39,17 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
@Expose private Options options;
@Expose private boolean isCreatedTable = false;
@Expose private boolean isOverwritten = false;
- @Expose private Partitions partitions;
+ @Expose private PartitionDesc partitionDesc;
public StoreTableNode(int pid, String tableName) {
super(pid, NodeType.STORE);
this.tableName = tableName;
}
- public StoreTableNode(int pid, String tableName, Partitions partitions) {
+ public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
super(pid, NodeType.STORE);
this.tableName = tableName;
- this.partitions = partitions;
+ this.partitionDesc = partitionDesc;
}
public final String getTableName() {
@@ -109,12 +109,12 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
return this.options;
}
- public Partitions getPartitions() {
- return partitions;
+ public PartitionDesc getPartitions() {
+ return partitionDesc;
}
- public void setPartitions(Partitions partitions) {
- this.partitions = partitions;
+ public void setPartitions(PartitionDesc partitionDesc) {
+ this.partitionDesc = partitionDesc;
}
@Override
@@ -146,7 +146,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
eq = eq && TUtil.checkEquals(options, other.options);
eq = eq && isCreatedTable == other.isCreatedTable;
eq = eq && isOverwritten == other.isOverwritten;
- eq = eq && TUtil.checkEquals(partitions, other.partitions);
+ eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
return eq;
} else {
return false;
@@ -163,7 +163,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
store.options = options != null ? (Options) options.clone() : null;
store.isCreatedTable = isCreatedTable;
store.isOverwritten = isOverwritten;
- store.partitions = partitions;
+ store.partitionDesc = partitionDesc;
return store;
}
@@ -188,8 +188,8 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
.append("\n \"in schema\": ").append(getInSchema());
- if(partitions != null) {
- sb.append(partitions.toString());
+ if(partitionDesc != null) {
+ sb.append(partitionDesc.toString());
}
sb.append("}");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index d1f0986..335d12f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -35,7 +35,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
this.tableName = PlannerUtil.normalizeTableName(tableName);
this.subQuery = subQuery;
setOutSchema((Schema) this.subQuery.getOutSchema().clone());
- getOutSchema().setQualifier(this.tableName, true);
+ getOutSchema().setQualifier(this.tableName);
setInSchema((Schema) this.subQuery.getInSchema().clone());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
index bd1b8d3..cbdad1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.PlanningException;
@@ -90,7 +90,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
joinNode.setInSchema(mergedSchema);
joinNode.setOutSchema(mergedSchema);
if (joinEdge.hasJoinQual()) {
- joinNode.setJoinQual(EvalTreeUtil.transformCNF2Singleton(joinEdge.getJoinQual()));
+ joinNode.setJoinQual(AlgebraicUtil.createSingletonExprFromCNF(joinEdge.getJoinQual()));
}
return joinNode;
}
@@ -206,7 +206,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
double filterFactor = 1;
if (joinNode.hasJoinQual()) {
filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR,
- EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()).length);
+ AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()).length);
return getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()) * filterFactor;
} else {
return Math.pow(getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()), 2);
@@ -215,7 +215,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
case SELECTION:
SelectionNode selectionNode = (SelectionNode) node;
return getCost(selectionNode.getChild()) *
- Math.pow(DEFAULT_SELECTION_FACTOR, EvalTreeUtil.getConjNormalForm(selectionNode.getQual()).length);
+ Math.pow(DEFAULT_SELECTION_FACTOR, AlgebraicUtil.toConjunctiveNormalFormArray(selectionNode.getQual()).length);
case TABLE_SUBQUERY:
TableSubQueryNode subQueryNode = (TableSubQueryNode) node;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index 66c82f3..74ef38a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
import com.google.common.collect.Sets;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.LogicalPlan;
@@ -35,7 +36,7 @@ import java.util.Set;
public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
JoinNode joinNode) throws PlanningException {
- Set<EvalNode> cnf = Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()));
+ Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
Set<EvalNode> nonJoinQuals = Sets.newHashSet();
for (EvalNode singleQual : cnf) {
if (PlannerUtil.isJoinQual(singleQual)) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index f7db4bd..db7e566 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
@@ -45,6 +47,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
/**
* This class is a physical operator to store at column partitioned table.
*/
@@ -56,15 +60,11 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
private Tuple tuple;
private Path storeTablePath;
private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();
- private int[] columnIndexes;
- private String[] columnNames;
-
+ private int[] partitionColumnIndices;
+ private String[] partitionColumnNames;
- /**
- * @throws java.io.IOException
- *
- */
- public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
+ public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
+ throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.plan = plan;
@@ -75,18 +75,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
meta = CatalogUtil.newTableMeta(plan.getStorageType());
}
+ // Rewrite a output schema because we don't have to store field values
+ // corresponding to partition key columns.
+ if (plan.getPartitions() != null && plan.getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+ rewriteColumnPartitionedTableSchema();
+ }
+
// Find column index to name subpartition directory path
if (this.plan.getPartitions() != null) {
if (this.plan.getPartitions().getColumns() != null) {
- columnIndexes = new int[plan.getPartitions().getColumns().size()];
- columnNames = new String[columnIndexes.length];
- for(int i = 0; i < plan.getPartitions().getColumns().size(); i++) {
- Column targetColumn = plan.getPartitions().getColumn(i);
+ partitionColumnIndices = new int[plan.getPartitions().getColumns().size()];
+ partitionColumnNames = new String[partitionColumnIndices.length];
+ Schema columnPartitionSchema = plan.getPartitions().getSchema();
+ for(int i = 0; i < columnPartitionSchema.getColumnNum(); i++) {
+ Column targetColumn = columnPartitionSchema.getColumn(i);
for(int j = 0; j < plan.getInSchema().getColumns().size();j++) {
Column inputColumn = plan.getInSchema().getColumn(j);
if (inputColumn.getColumnName().equals(targetColumn.getColumnName())) {
- columnIndexes[i] = j;
- columnNames[i] = targetColumn.getColumnName();
+ partitionColumnIndices[i] = j;
+ partitionColumnNames[i] = targetColumn.getColumnName();
}
}
}
@@ -94,6 +101,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
}
}
+ /**
+ * This method rewrites an input schema of column-partitioned table because
+ * there are no actual field values in data file in a column-partitioned table.
+ * So, this method removes partition key columns from the input schema.
+ */
+ private void rewriteColumnPartitionedTableSchema() {
+ PartitionDesc partitionDesc = plan.getPartitions();
+ Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+ columnPartitionSchema.setQualifier(plan.getTableName());
+
+ Schema modifiedOutputSchema = new Schema();
+ for (Column column : outSchema.toArray()) {
+ if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+ modifiedOutputSchema.addColumn(column);
+ }
+ }
+ outSchema = modifiedOutputSchema;
+ }
+
public void init() throws IOException {
super.init();
@@ -124,8 +150,7 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
LOG.info("File size: " + status.getLen());
}
- appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
- outSchema, dataFile);
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
appenderMap.put(partition, appender);
@@ -148,12 +173,12 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
while((tuple = child.next()) != null) {
// set subpartition directory name
sb.delete(0, sb.length());
- if (columnIndexes != null) {
- for(int i = 0; i < columnIndexes.length; i++) {
- Datum datum = (Datum) tuple.get(columnIndexes[i]);
+ if (partitionColumnIndices != null) {
+ for(int i = 0; i < partitionColumnIndices.length; i++) {
+ Datum datum = tuple.get(partitionColumnIndices[i]);
if(i > 0)
sb.append("/");
- sb.append(columnNames[i]).append("=");
+ sb.append(partitionColumnNames[i]).append("=");
sb.append(datum.asChars());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index b799095..4e6cd64 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -27,9 +27,9 @@ import java.io.IOException;
public abstract class PhysicalExec implements SchemaObject {
protected final TaskAttemptContext context;
- protected final Schema inSchema;
- protected final Schema outSchema;
- protected final int outColumnNum;
+ protected Schema inSchema;
+ protected Schema outSchema;
+ protected int outColumnNum;
public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
final Schema outSchema) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 5783080..d17f7ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,15 +18,16 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.ScanNode;
@@ -34,8 +35,11 @@ import org.apache.tajo.storage.*;
import java.io.IOException;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
public class SeqScanExec extends PhysicalExec {
private final ScanNode plan;
private Scanner scanner = null;
@@ -63,8 +67,67 @@ public class SeqScanExec extends PhysicalExec {
}
}
+ /**
+ * This method rewrites an input schema of column-partitioned table because
+ * there are no actual field values in data file in a column-partitioned table.
+ * So, this method removes partition key columns from the input schema.
+ *
+ * TODO - This implementation assumes that a fragment is always FileFragment.
+ * In the column partitioned table, a path has an important role to
+ * indicate partition keys. In this time, it is right. Later, we have to fix it.
+ */
+ private void rewriteColumnPartitionedTableSchema() throws IOException {
+ PartitionDesc partitionDesc = plan.getTableDesc().getPartitions();
+ Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+ List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+
+ // Get a partition key value from a given path
+ Tuple partitionRow =
+ TupleUtil.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), false);
+
+ // Remove partition key columns from an input schema.
+ columnPartitionSchema.setQualifier(inSchema.getColumn(0).getQualifier());
+ Schema modifiedInputSchema = new Schema();
+ for (Column column : inSchema.toArray()) {
+ if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+ modifiedInputSchema.addColumn(column);
+ }
+ }
+ this.inSchema = modifiedInputSchema;
+
+ // Targets or search conditions may contain column references.
+ // However, actual values absent in tuples. So, Replace all column references by constant datum.
+ for (Column column : columnPartitionSchema.toArray()) {
+ FieldEval targetExpr = new FieldEval(column);
+ EvalContext evalContext = targetExpr.newContext();
+ targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
+ Datum datum = targetExpr.terminate(evalContext);
+ ConstEval constExpr = new ConstEval(datum);
+ for (Target target : plan.getTargets()) {
+ if (target.getEvalTree().equals(targetExpr)) {
+ if (!target.hasAlias()) {
+ target.setAlias(target.getEvalTree().getName());
+ }
+ target.setExpr(constExpr);
+ } else {
+ EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
+ }
+ }
+
+ if (plan.hasQual()) {
+ EvalTreeUtil.replace(plan.getQual(), targetExpr, constExpr);
+ }
+ }
+ }
+
public void init() throws IOException {
Schema projected;
+
+ if (plan.getTableDesc().hasPartitions()
+ && plan.getTableDesc().getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+ rewriteColumnPartitionedTableSchema();
+ }
+
if (plan.hasTargets()) {
projected = new Schema();
Set<Column> columnSet = new HashSet<Column>();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 7673253..817e48a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -22,10 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.PlannerUtil;
@@ -65,7 +62,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
@Override
public LogicalNode visitFilter(Set<EvalNode> cnf, LogicalPlan plan, SelectionNode selNode, Stack<LogicalNode> stack)
throws PlanningException {
- cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(selNode.getQual())));
+ cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
stack.push(selNode);
visitChild(cnf, plan, selNode.getChild(), stack);
@@ -178,15 +175,15 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
EvalNode qual2 = null;
if (matched2.size() > 1) {
// merged into one eval tree
- qual2 = EvalTreeUtil.transformCNF2Singleton(
- matched2.toArray(new EvalNode [matched2.size()]));
+ qual2 = AlgebraicUtil.createSingletonExprFromCNF(
+ matched2.toArray(new EvalNode[matched2.size()]));
} else if (matched2.size() == 1) {
// if the number of matched expr is one
qual2 = matched2.get(0);
}
if (qual2 != null) {
- EvalNode conjQual2 = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual2);
+ EvalNode conjQual2 = AlgebraicUtil.createSingletonExprFromCNF(joinNode.getJoinQual(), qual2);
joinNode.setJoinQual(conjQual2);
cnf.removeAll(matched2);
} // for the remaining cnf, push it as usual
@@ -194,7 +191,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
}
if (joinNode.hasJoinQual()) {
- cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual())));
+ cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
}
visitChild(cnf, plan, left, stack);
@@ -210,7 +207,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
EvalNode qual = null;
if (matched.size() > 1) {
// merged into one eval tree
- qual = EvalTreeUtil.transformCNF2Singleton(
+ qual = AlgebraicUtil.createSingletonExprFromCNF(
matched.toArray(new EvalNode[matched.size()]));
} else if (matched.size() == 1) {
// if the number of matched expr is one
@@ -243,8 +240,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
EvalNode qual = null;
if (matched.size() > 1) {
// merged into one eval tree
- qual = EvalTreeUtil.transformCNF2Singleton(
- matched.toArray(new EvalNode [matched.size()]));
+ qual = AlgebraicUtil.createSingletonExprFromCNF(
+ matched.toArray(new EvalNode[matched.size()]));
} else if (matched.size() == 1) {
// if the number of matched expr is one
qual = matched.get(0);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
new file mode 100644
index 0000000..561424f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class PartitionedTableRewriter implements RewriteRule {
+ private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
+
+ private static final String NAME = "Partitioned Table Rewriter";
+ private final Rewriter rewriter = new Rewriter();
+
+ private final TajoConf systemConf;
+
+ public PartitionedTableRewriter(TajoConf conf) {
+ systemConf = conf;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public boolean isEligible(LogicalPlan plan) {
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ for (RelationNode relation : block.getRelations()) {
+ if (relation.getType() == NodeType.SCAN) {
+ TableDesc table = ((ScanNode)relation).getTableDesc();
+ if (table.hasPartitions()) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+ boolean containsPartitionedTables;
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ containsPartitionedTables = false;
+ for (RelationNode relation : block.getRelations()) {
+ if (relation.getType() == NodeType.SCAN) {
+ TableDesc table = ((ScanNode)relation).getTableDesc();
+ if (table.hasPartitions()) {
+ containsPartitionedTables = true;
+ }
+ }
+ }
+ if (containsPartitionedTables) {
+ rewriter.visitChild(block, plan, block.getRoot(), new Stack<LogicalNode>());
+ }
+ }
+ return plan;
+ }
+
+ private static class PartitionPathFilter implements PathFilter {
+ private FileSystem fs;
+ private Schema schema;
+ private EvalNode partitionFilter;
+ private EvalContext evalContext;
+
+
+ public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
+ this.schema = schema;
+ this.partitionFilter = partitionFilter;
+ evalContext = partitionFilter.newContext();
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+ if (tuple == null) { // if it is a file or not acceptable file
+ return false;
+ }
+ partitionFilter.eval(evalContext, schema, tuple);
+ return partitionFilter.terminate(evalContext).asBool();
+ }
+
+ @Override
+ public String toString() {
+ return partitionFilter.toString();
+ }
+ }
+
+ /**
+ * It assumes that each conjunctive form corresponds to one column.
+ *
+ * @param partitionColumns
+ * @param conjunctiveForms search condition corresponding to partition columns.
+ * If it is NULL, it means that there is no search condition for this table.
+ * @param tablePath
+ * @return
+ * @throws IOException
+ */
+ private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
+ throws IOException {
+
+ FileSystem fs = tablePath.getFileSystem(systemConf);
+
+ PathFilter [] filters;
+ if (conjunctiveForms == null) {
+ filters = buildAllAcceptingPathFilters(partitionColumns);
+ } else {
+ filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
+ }
+
+ // loop from one to the number of partition columns
+ Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
+
+ for (int i = 1; i < partitionColumns.getColumnNum(); i++) {
+ // Get all file status matched to a ith level path filter.
+ filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
+ }
+
+ return filteredPaths;
+ }
+
+ /**
+ * Build path filters for all levels with a list of filter conditions.
+ *
+ * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
+ * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
+ *
+ * Corresponding filter conditions will be placed on each path filter,
+ * If there is no corresponding expression for certain column,
+ * The condition will be filled with a true value.
+ *
+ * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
+ * There is no filter condition corresponding to col2.
+ * Then, the path filter conditions are corresponding to the followings:
+ *
+ * The first path filter: col1 = 'A'
+ * The second path filter: col1 = 'A' AND col2 IS NOT NULL
+ * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
+ *
+ * 'IS NOT NULL' predicate is always true against the partition path.
+ *
+ * @param partitionColumns
+ * @param conjunctiveForms
+ * @return
+ */
+ private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
+ EvalNode [] conjunctiveForms) {
+ // Building partition path filters for all levels
+ Column target;
+ PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+ List<EvalNode> accumulatedFilters = Lists.newArrayList();
+ for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+ target = partitionColumns.getColumn(i);
+
+ for (EvalNode expr : conjunctiveForms) {
+ if (EvalTreeUtil.findDistinctRefColumns(expr).contains(target)) {
+ // Accumulate one qual per level
+ accumulatedFilters.add(expr);
+ }
+ }
+
+ if (accumulatedFilters.size() < (i + 1)) {
+ accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+ }
+
+ EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+ accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+ filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+ }
+ return filters;
+ }
+
+ /**
+ * Build an array of path filters for all levels with all accepting filter condition.
+ * @param partitionColumns The partition columns schema
+ * @return The array of path filter, accpeting all partition paths.
+ */
+ private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+ Column target;
+ PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+ List<EvalNode> accumulatedFilters = Lists.newArrayList();
+ for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+ target = partitionColumns.getColumn(i);
+ accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+
+ EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+ accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+ filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+ }
+ return filters;
+ }
+
+ private static Path [] toPathArray(FileStatus[] fileStatuses) {
+ Path [] paths = new Path[fileStatuses.length];
+ for (int j = 0; j < fileStatuses.length; j++) {
+ paths[j] = fileStatuses[j].getPath();
+ }
+ return paths;
+ }
+
+ private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
+ TableDesc table = scanNode.getTableDesc();
+ FileSystem fs = table.getPath().getFileSystem(systemConf);
+ LOG.info("Partitioned Table Dir: " + table.getPath());
+ LOG.info("Summary: " + fs.getContentSummary(table.getPath()).getDirectoryCount());
+ PartitionDesc partitionDesc = scanNode.getTableDesc().getPartitions();
+
+ Schema paritionValuesSchema = new Schema();
+ for (Column column : partitionDesc.getColumns()) {
+ paritionValuesSchema.addColumn(column);
+ }
+
+ Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+
+ // if a query statement has a search condition, try to find indexable predicates
+ if (scanNode.hasQual()) {
+ EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
+ Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
+
+ // add qualifier to schema for qual
+ paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
+ for (Column column : paritionValuesSchema.getColumns()) {
+ for (EvalNode simpleExpr : conjunctiveForms) {
+ if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
+ indexablePredicateSet.add(simpleExpr);
+ }
+ }
+ }
+
+ // Partitions which are not matched to the partition filter conditions are pruned immediately.
+ // So, the partition filter conditions are not necessary later, and they are removed from
+ // original search condition for simplicity and efficiency.
+ remainExprs.removeAll(indexablePredicateSet);
+ if (remainExprs.isEmpty()) {
+ scanNode.setQual(null);
+ } else {
+ scanNode.setQual(
+ AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
+ }
+ }
+
+ if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
+ return findFilteredPaths(paritionValuesSchema,
+ indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
+ } else { // otherwise, we will get all partition paths.
+ return findFilteredPaths(paritionValuesSchema, null, table.getPath());
+ }
+ }
+
+ private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+ if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
+ Set<Column> variables = EvalTreeUtil.findDistinctRefColumns(evalNode);
+ // if it contains only single variable matched to a target column
+ return variables.size() == 1 && variables.contains(targetColumn);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Check if an expression consists of one variable and one constant and
+ * the expression is a comparison operator.
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an expression consists of one variable and one constant
+ * and the expression is a comparison operator. Other, false.
+ */
+ private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+ // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
+ return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
+ }
+
+ /**
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an disjunctive expression, consisting of indexable expressions
+ */
+ private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
+ if (evalNode.getType() == EvalType.OR) {
+ boolean indexable =
+ checkIfIndexablePredicate(evalNode.getLeftExpr()) &&
+ checkIfIndexablePredicate(evalNode.getRightExpr());
+
+ boolean sameVariable =
+ EvalTreeUtil.findDistinctRefColumns(evalNode.getLeftExpr())
+ .equals(EvalTreeUtil.findDistinctRefColumns(evalNode.getRightExpr()));
+
+ return indexable && sameVariable;
+ } else {
+ return false;
+ }
+ }
+
+ private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
+ if (scanNode.getInputPaths().length > 0) {
+ try {
+ FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
+ long totalVolume = 0;
+
+ for (Path input : scanNode.getInputPaths()) {
+ ContentSummary summary = fs.getContentSummary(input);
+ totalVolume += summary.getLength();
+ totalVolume += summary.getFileCount();
+ }
+ scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
+ } catch (IOException e) {
+ throw new PlanningException(e);
+ }
+ }
+ }
+
+ private final class Rewriter extends BasicLogicalPlanVisitor<LogicalPlan.QueryBlock, Object> {
+ @Override
+ public Object visitScan(LogicalPlan.QueryBlock block, LogicalPlan plan, ScanNode scanNode, Stack<LogicalNode> stack)
+ throws PlanningException {
+
+ TableDesc table = scanNode.getTableDesc();
+ if (!table.hasPartitions()) {
+ return null;
+ }
+
+ try {
+ Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
+ plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
+ PartitionedTableScanNode rewrittenScanNode = new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
+ updateTableStat(rewrittenScanNode);
+ PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
+ } catch (IOException e) {
+ throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index d0e9a6f..403b403 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -304,7 +304,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
LogicalNode child = visitChild(newContext, plan, subRoot, newStack);
newStack.pop();
Schema inSchema = (Schema) child.getOutSchema().clone();
- inSchema.setQualifier(node.getCanonicalName(), true);
+ inSchema.setQualifier(node.getCanonicalName());
node.setInSchema(inSchema);
return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
}