You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:24 UTC
[30/51] [partial] TAJO-22: The package prefix should be
org.apache.tajo. (DaeMyung Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
new file mode 100644
index 0000000..0796cfd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.util.IndexUtil;
+
+import java.io.IOException;
+
+public class PhysicalPlannerImpl implements PhysicalPlanner {
+ private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
+ protected final TajoConf conf;
+ protected final StorageManager sm;
+
+ public PhysicalPlannerImpl(final TajoConf conf, final StorageManager sm) {
+ this.conf = conf;
+ this.sm = sm;
+ }
+
+ public PhysicalExec createPlan(final TaskAttemptContext context,
+ final LogicalNode logicalPlan) throws InternalException {
+
+ PhysicalExec plan;
+
+ try {
+ plan = createPlanRecursive(context, logicalPlan);
+
+ } catch (IOException ioe) {
+ throw new InternalException(ioe);
+ }
+
+ return plan;
+ }
+
+ private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+ PhysicalExec outer;
+ PhysicalExec inner;
+
+ switch (logicalNode.getType()) {
+
+ case ROOT:
+ LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
+ return createPlanRecursive(ctx, rootNode.getSubNode());
+
+ case EXPRS:
+ EvalExprNode evalExpr = (EvalExprNode) logicalNode;
+ return new EvalExprExec(ctx, evalExpr);
+
+ case STORE:
+ StoreTableNode storeNode = (StoreTableNode) logicalNode;
+ outer = createPlanRecursive(ctx, storeNode.getSubNode());
+ return createStorePlan(ctx, storeNode, outer);
+
+ case SELECTION:
+ SelectionNode selNode = (SelectionNode) logicalNode;
+ outer = createPlanRecursive(ctx, selNode.getSubNode());
+ return new SelectionExec(ctx, selNode, outer);
+
+ case PROJECTION:
+ ProjectionNode prjNode = (ProjectionNode) logicalNode;
+ outer = createPlanRecursive(ctx, prjNode.getSubNode());
+ return new ProjectionExec(ctx, prjNode, outer);
+
+ case SCAN:
+ outer = createScanPlan(ctx, (ScanNode) logicalNode);
+ return outer;
+
+ case GROUP_BY:
+ GroupbyNode grpNode = (GroupbyNode) logicalNode;
+ outer = createPlanRecursive(ctx, grpNode.getSubNode());
+ return createGroupByPlan(ctx, grpNode, outer);
+
+ case SORT:
+ SortNode sortNode = (SortNode) logicalNode;
+ outer = createPlanRecursive(ctx, sortNode.getSubNode());
+ return createSortPlan(ctx, sortNode, outer);
+
+ case JOIN:
+ JoinNode joinNode = (JoinNode) logicalNode;
+ outer = createPlanRecursive(ctx, joinNode.getOuterNode());
+ inner = createPlanRecursive(ctx, joinNode.getInnerNode());
+ return createJoinPlan(ctx, joinNode, outer, inner);
+
+ case UNION:
+ UnionNode unionNode = (UnionNode) logicalNode;
+ outer = createPlanRecursive(ctx, unionNode.getOuterNode());
+ inner = createPlanRecursive(ctx, unionNode.getInnerNode());
+ return new UnionExec(ctx, outer, inner);
+
+ case LIMIT:
+ LimitNode limitNode = (LimitNode) logicalNode;
+ outer = createPlanRecursive(ctx, limitNode.getSubNode());
+ return new LimitExec(ctx, limitNode.getInSchema(),
+ limitNode.getOutSchema(), outer, limitNode);
+
+ case CREATE_INDEX:
+ IndexWriteNode createIndexNode = (IndexWriteNode) logicalNode;
+ outer = createPlanRecursive(ctx, createIndexNode.getSubNode());
+ return createIndexWritePlan(sm, ctx, createIndexNode, outer);
+
+ case BST_INDEX_SCAN:
+ IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
+ outer = createIndexScanExec(ctx, indexScanNode);
+ return outer;
+
+ case RENAME:
+ case SET_UNION:
+ case SET_DIFF:
+ case SET_INTERSECT:
+ case INSERT_INTO:
+ case SHOW_TABLE:
+ case DESC_TABLE:
+ case SHOW_FUNCTION:
+ default:
+ return null;
+ }
+ }
+
+ private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) {
+ long size = 0;
+ for (String tableId : tableIds) {
+ Fragment[] fragments = ctx.getTables(tableId);
+ for (Fragment frag : fragments) {
+ size += frag.getLength();
+ }
+ }
+ return size;
+ }
+
+ public PhysicalExec createJoinPlan(TaskAttemptContext ctx, JoinNode joinNode,
+ PhysicalExec outer, PhysicalExec inner)
+ throws IOException {
+ switch (joinNode.getJoinType()) {
+ case CROSS_JOIN:
+ LOG.info("The planner chooses NLJoinExec");
+ return new NLJoinExec(ctx, joinNode, outer, inner);
+
+ case INNER:
+ String [] outerLineage = PlannerUtil.getLineage(joinNode.getOuterNode());
+ String [] innerLineage = PlannerUtil.getLineage(joinNode.getInnerNode());
+ long outerSize = estimateSizeRecursive(ctx, outerLineage);
+ long innerSize = estimateSizeRecursive(ctx, innerLineage);
+
+ final long threshold = 1048576 * 128; // 64MB
+
+ boolean hashJoin = false;
+ if (outerSize < threshold || innerSize < threshold) {
+ hashJoin = true;
+ }
+
+ if (hashJoin) {
+ PhysicalExec selectedOuter;
+ PhysicalExec selectedInner;
+
+ // HashJoinExec loads the inner relation to memory.
+ if (outerSize <= innerSize) {
+ selectedInner = outer;
+ selectedOuter = inner;
+ } else {
+ selectedInner = inner;
+ selectedOuter = outer;
+ }
+
+ LOG.info("The planner chooses HashJoinExec");
+ return new HashJoinExec(ctx, joinNode, selectedOuter, selectedInner);
+ }
+
+ default:
+ SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+ joinNode.getJoinQual(), outer.getSchema(), inner.getSchema());
+ ExternalSortExec outerSort = new ExternalSortExec(ctx, sm,
+ new SortNode(sortSpecs[0], outer.getSchema(), outer.getSchema()),
+ outer);
+ ExternalSortExec innerSort = new ExternalSortExec(ctx, sm,
+ new SortNode(sortSpecs[1], inner.getSchema(), inner.getSchema()),
+ inner);
+
+ LOG.info("The planner chooses MergeJoinExec");
+ return new MergeJoinExec(ctx, joinNode, outerSort, innerSort,
+ sortSpecs[0], sortSpecs[1]);
+ }
+ }
+
+ public PhysicalExec createStorePlan(TaskAttemptContext ctx,
+ StoreTableNode plan, PhysicalExec subOp) throws IOException {
+ if (plan.hasPartitionKey()) {
+ switch (plan.getPartitionType()) {
+ case HASH:
+ return new PartitionedStoreExec(ctx, sm, plan, subOp);
+
+ case RANGE:
+ SortSpec [] sortSpecs = null;
+ if (subOp instanceof SortExec) {
+ sortSpecs = ((SortExec)subOp).getSortSpecs();
+ } else {
+ Column[] columns = plan.getPartitionKeys();
+ SortSpec specs[] = new SortSpec[columns.length];
+ for (int i = 0; i < columns.length; i++) {
+ specs[i] = new SortSpec(columns[i]);
+ }
+ }
+
+ return new IndexedStoreExec(ctx, sm, subOp,
+ plan.getInSchema(), plan.getInSchema(), sortSpecs);
+ }
+ }
+ if (plan instanceof StoreIndexNode) {
+ return new TunnelExec(ctx, plan.getOutSchema(), subOp);
+ }
+
+ return new StoreTableExec(ctx, sm, plan, subOp);
+ }
+
+ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
+ throws IOException {
+ Preconditions.checkNotNull(ctx.getTable(scanNode.getTableId()),
+ "Error: There is no table matched to %s", scanNode.getTableId());
+
+ Fragment[] fragments = ctx.getTables(scanNode.getTableId());
+ return new SeqScanExec(ctx, sm, scanNode, fragments);
+ }
+
+ public PhysicalExec createGroupByPlan(TaskAttemptContext ctx,
+ GroupbyNode groupbyNode, PhysicalExec subOp) throws IOException {
+ Column[] grpColumns = groupbyNode.getGroupingColumns();
+ if (grpColumns.length == 0) {
+ LOG.info("The planner chooses HashAggregationExec");
+ return new HashAggregateExec(ctx, groupbyNode, subOp);
+ } else {
+ String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getSubNode());
+ long estimatedSize = estimateSizeRecursive(ctx, outerLineage);
+ final long threshold = 1048576 * 256;
+
+ // if the relation size is less than the reshold,
+ // the hash aggregation will be used.
+ if (estimatedSize <= threshold) {
+ LOG.info("The planner chooses HashAggregationExec");
+ return new HashAggregateExec(ctx, groupbyNode, subOp);
+ } else {
+ SortSpec[] specs = new SortSpec[grpColumns.length];
+ for (int i = 0; i < grpColumns.length; i++) {
+ specs[i] = new SortSpec(grpColumns[i], true, false);
+ }
+ SortNode sortNode = new SortNode(specs);
+ sortNode.setInSchema(subOp.getSchema());
+ sortNode.setOutSchema(subOp.getSchema());
+ // SortExec sortExec = new SortExec(sortNode, child);
+ ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode,
+ subOp);
+ LOG.info("The planner chooses SortAggregationExec");
+ return new SortAggregateExec(ctx, groupbyNode, sortExec);
+ }
+ }
+ }
+
+ public PhysicalExec createSortPlan(TaskAttemptContext ctx, SortNode sortNode,
+ PhysicalExec subOp) throws IOException {
+ return new ExternalSortExec(ctx, sm, sortNode, subOp);
+ }
+
+ public PhysicalExec createIndexWritePlan(StorageManager sm,
+ TaskAttemptContext ctx, IndexWriteNode indexWriteNode, PhysicalExec subOp)
+ throws IOException {
+
+ return new IndexWriteExec(ctx, sm, indexWriteNode, ctx.getTable(indexWriteNode
+ .getTableName()), subOp);
+ }
+
+ public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
+ IndexScanNode annotation)
+ throws IOException {
+ //TODO-general Type Index
+ Preconditions.checkNotNull(ctx.getTable(annotation.getTableId()),
+ "Error: There is no table matched to %s", annotation.getTableId());
+
+ Fragment[] fragments = ctx.getTables(annotation.getTableId());
+
+ String indexName = IndexUtil.getIndexNameOfFrag(fragments[0],
+ annotation.getSortKeys());
+ Path indexPath = new Path(sm.getTablePath(annotation.getTableId()), "index");
+
+ TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
+ annotation.getSortKeys());
+ return new BSTIndexScanExec(ctx, sm, annotation, fragments[0], new Path(
+ indexPath, indexName), annotation.getKeySchema(), comp,
+ annotation.getDatum());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
new file mode 100644
index 0000000..50b539a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.exception.InvalidQueryException;
+import org.apache.tajo.storage.TupleComparator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class PlannerUtil {
+ private static final Log LOG = LogFactory.getLog(PlannerUtil.class);
+
+ public static String [] getLineage(LogicalNode node) {
+ LogicalNode [] scans = PlannerUtil.findAllNodes(node, ExprType.SCAN);
+ String [] tableNames = new String[scans.length];
+ ScanNode scan;
+ for (int i = 0; i < scans.length; i++) {
+ scan = (ScanNode) scans[i];
+ /*if (scan.hasAlias()) {
+ tableNames[i] = scan.getAlias();
+ } else {*/
+ tableNames[i] = scan.getTableId();
+ //}
+ }
+ return tableNames;
+ }
+
+ public static LogicalNode insertNode(LogicalNode parent, LogicalNode newNode) {
+ Preconditions.checkArgument(parent instanceof UnaryNode);
+ Preconditions.checkArgument(newNode instanceof UnaryNode);
+
+ UnaryNode p = (UnaryNode) parent;
+ LogicalNode c = p.getSubNode();
+ UnaryNode m = (UnaryNode) newNode;
+ m.setInSchema(c.getOutSchema());
+ m.setOutSchema(c.getOutSchema());
+ m.setSubNode(c);
+ p.setSubNode(m);
+
+ return p;
+ }
+
+ /**
+ * Delete the child of a given parent operator.
+ *
+ * @param parent Must be a unary logical operator.
+ * @return input parent node
+ */
+ public static LogicalNode deleteNode(LogicalNode parent) {
+ if (parent instanceof UnaryNode) {
+ UnaryNode unary = (UnaryNode) parent;
+ if (unary.getSubNode() instanceof UnaryNode) {
+ UnaryNode child = (UnaryNode) unary.getSubNode();
+ LogicalNode grandChild = child.getSubNode();
+ unary.setSubNode(grandChild);
+ } else {
+ throw new InvalidQueryException("Unexpected logical plan: " + parent);
+ }
+ } else {
+ throw new InvalidQueryException("Unexpected logical plan: " + parent);
+ }
+ return parent;
+ }
+
+ public static void replaceNode(LogicalNode plan, LogicalNode newNode, ExprType type) {
+ LogicalNode parent = findTopParentNode(plan, type);
+ Preconditions.checkArgument(parent instanceof UnaryNode);
+ Preconditions.checkArgument(!(newNode instanceof BinaryNode));
+ UnaryNode parentNode = (UnaryNode) parent;
+ LogicalNode child = parentNode.getSubNode();
+ if (child instanceof UnaryNode) {
+ ((UnaryNode) newNode).setSubNode(((UnaryNode)child).getSubNode());
+ }
+ parentNode.setSubNode(newNode);
+ }
+
+ public static LogicalNode insertOuterNode(LogicalNode parent, LogicalNode outer) {
+ Preconditions.checkArgument(parent instanceof BinaryNode);
+ Preconditions.checkArgument(outer instanceof UnaryNode);
+
+ BinaryNode p = (BinaryNode) parent;
+ LogicalNode c = p.getOuterNode();
+ UnaryNode m = (UnaryNode) outer;
+ m.setInSchema(c.getOutSchema());
+ m.setOutSchema(c.getOutSchema());
+ m.setSubNode(c);
+ p.setOuter(m);
+ return p;
+ }
+
+ public static LogicalNode insertInnerNode(LogicalNode parent, LogicalNode inner) {
+ Preconditions.checkArgument(parent instanceof BinaryNode);
+ Preconditions.checkArgument(inner instanceof UnaryNode);
+
+ BinaryNode p = (BinaryNode) parent;
+ LogicalNode c = p.getInnerNode();
+ UnaryNode m = (UnaryNode) inner;
+ m.setInSchema(c.getOutSchema());
+ m.setOutSchema(c.getOutSchema());
+ m.setSubNode(c);
+ p.setInner(m);
+ return p;
+ }
+
+ public static LogicalNode insertNode(LogicalNode parent,
+ LogicalNode left, LogicalNode right) {
+ Preconditions.checkArgument(parent instanceof BinaryNode);
+ Preconditions.checkArgument(left instanceof UnaryNode);
+ Preconditions.checkArgument(right instanceof UnaryNode);
+
+ BinaryNode p = (BinaryNode)parent;
+ LogicalNode lc = p.getOuterNode();
+ LogicalNode rc = p.getInnerNode();
+ UnaryNode lm = (UnaryNode)left;
+ UnaryNode rm = (UnaryNode)right;
+ lm.setInSchema(lc.getOutSchema());
+ lm.setOutSchema(lc.getOutSchema());
+ lm.setSubNode(lc);
+ rm.setInSchema(rc.getOutSchema());
+ rm.setOutSchema(rc.getOutSchema());
+ rm.setSubNode(rc);
+ p.setOuter(lm);
+ p.setInner(rm);
+ return p;
+ }
+
+ public static LogicalNode transformGroupbyTo2P(GroupbyNode gp) {
+ Preconditions.checkNotNull(gp);
+
+ try {
+ // cloning groupby node
+ GroupbyNode child = (GroupbyNode) gp.clone();
+
+ List<QueryBlock.Target> newChildTargets = Lists.newArrayList();
+ QueryBlock.Target[] secondTargets = gp.getTargets();
+ QueryBlock.Target[] firstTargets = child.getTargets();
+
+ QueryBlock.Target second;
+ QueryBlock.Target first;
+ int firstTargetId = 0;
+ for (int i = 0; i < firstTargets.length; i++) {
+ second = secondTargets[i];
+ first = firstTargets[i];
+
+ List<AggFuncCallEval> secondFuncs = EvalTreeUtil
+ .findDistinctAggFunction(second.getEvalTree());
+ List<AggFuncCallEval> firstFuncs = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
+
+ if (firstFuncs.size() == 0) {
+ newChildTargets.add(first);
+ firstTargetId++;
+ } else {
+ for (AggFuncCallEval func : firstFuncs) {
+ func.setFirstPhase();
+ QueryBlock.Target
+ newTarget = new QueryBlock.Target(func, firstTargetId++);
+ newTarget.setAlias("column_"+ firstTargetId);
+
+
+ AggFuncCallEval secondFunc = null;
+ for (AggFuncCallEval sf : secondFuncs) {
+ if (func.equals(sf)) {
+ secondFunc = sf;
+ break;
+ }
+ }
+ if (func.getValueType().length > 1) { // hack for partial result
+ secondFunc.setArgs(new EvalNode[] {new FieldEval(
+ new Column("column_"+firstTargetId, Type.ARRAY))});
+ } else {
+ secondFunc.setArgs(new EvalNode [] {new FieldEval(
+ new Column("column_"+firstTargetId, newTarget.getEvalTree().getValueType()[0]))});
+ }
+ newChildTargets.add(newTarget);
+ }
+ }
+ }
+
+ QueryBlock.Target[] targetArray = newChildTargets.toArray(new QueryBlock.Target[newChildTargets.size()]);
+ child.setTargetList(targetArray);
+ child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
+ // set the groupby chaining
+ gp.setSubNode(child);
+ gp.setInSchema(child.getOutSchema());
+ } catch (CloneNotSupportedException e) {
+ LOG.error(e);
+ }
+
+ return gp;
+ }
+
+ public static LogicalNode transformSortTo2P(SortNode sort) {
+ Preconditions.checkNotNull(sort);
+
+ try {
+ SortNode child = (SortNode) sort.clone();
+ sort.setSubNode(child);
+ sort.setInSchema(child.getOutSchema());
+ sort.setOutSchema(child.getOutSchema());
+ } catch (CloneNotSupportedException e) {
+ LOG.error(e);
+ }
+ return sort;
+ }
+
+ public static LogicalNode transformGroupbyTo2PWithStore(GroupbyNode gb,
+ String tableId) {
+ GroupbyNode groupby = (GroupbyNode) transformGroupbyTo2P(gb);
+ return insertStore(groupby, tableId);
+ }
+
+ public static LogicalNode transformSortTo2PWithStore(SortNode sort,
+ String tableId) {
+ SortNode sort2p = (SortNode) transformSortTo2P(sort);
+ return insertStore(sort2p, tableId);
+ }
+
+ private static LogicalNode insertStore(LogicalNode parent,
+ String tableId) {
+ StoreTableNode store = new StoreTableNode(tableId);
+ store.setLocal(true);
+ insertNode(parent, store);
+
+ return parent;
+ }
+
+ /**
+ * Find the top logical node matched to type from the given node
+ *
+ * @param node start node
+ * @param type to find
+ * @return a found logical node
+ */
+ public static LogicalNode findTopNode(LogicalNode node, ExprType type) {
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(type);
+
+ LogicalNodeFinder finder = new LogicalNodeFinder(type);
+ node.postOrder(finder);
+
+ if (finder.getFoundNodes().size() == 0) {
+ return null;
+ }
+ return finder.getFoundNodes().get(0);
+ }
+
+ /**
+ * Find the all logical node matched to type from the given node
+ *
+ * @param node start node
+ * @param type to find
+ * @return a found logical node
+ */
+ public static LogicalNode [] findAllNodes(LogicalNode node, ExprType type) {
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(type);
+
+ LogicalNodeFinder finder = new LogicalNodeFinder(type);
+ node.postOrder(finder);
+
+ if (finder.getFoundNodes().size() == 0) {
+ return new LogicalNode[] {};
+ }
+ List<LogicalNode> founds = finder.getFoundNodes();
+ return founds.toArray(new LogicalNode[founds.size()]);
+ }
+
+ /**
+ * Find a parent node of a given-typed operator.
+ *
+ * @param node start node
+ * @param type to find
+ * @return the parent node of a found logical node
+ */
+ public static LogicalNode findTopParentNode(LogicalNode node, ExprType type) {
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(type);
+
+ ParentNodeFinder finder = new ParentNodeFinder(type);
+ node.postOrder(finder);
+
+ if (finder.getFoundNodes().size() == 0) {
+ return null;
+ }
+ return finder.getFoundNodes().get(0);
+ }
+
+ private static class LogicalNodeFinder implements LogicalNodeVisitor {
+ private List<LogicalNode> list = new ArrayList<LogicalNode>();
+ private final ExprType [] tofind;
+ private boolean topmost = false;
+ private boolean finished = false;
+
+ public LogicalNodeFinder(ExprType...type) {
+ this.tofind = type;
+ }
+
+ public LogicalNodeFinder(ExprType [] type, boolean topmost) {
+ this(type);
+ this.topmost = topmost;
+ }
+
+ @Override
+ public void visit(LogicalNode node) {
+ if (!finished) {
+ for (ExprType type : tofind) {
+ if (node.getType() == type) {
+ list.add(node);
+ }
+ if (topmost && list.size() > 0) {
+ finished = true;
+ }
+ }
+ }
+ }
+
+ public List<LogicalNode> getFoundNodes() {
+ return list;
+ }
+ }
+
+ private static class ParentNodeFinder implements LogicalNodeVisitor {
+ private List<LogicalNode> list = new ArrayList<LogicalNode>();
+ private ExprType tofind;
+
+ public ParentNodeFinder(ExprType type) {
+ this.tofind = type;
+ }
+
+ @Override
+ public void visit(LogicalNode node) {
+ if (node instanceof UnaryNode) {
+ UnaryNode unary = (UnaryNode) node;
+ if (unary.getSubNode().getType() == tofind) {
+ list.add(node);
+ }
+ } else if (node instanceof BinaryNode){
+ BinaryNode bin = (BinaryNode) node;
+ if (bin.getOuterNode().getType() == tofind ||
+ bin.getInnerNode().getType() == tofind) {
+ list.add(node);
+ }
+ }
+ }
+
+ public List<LogicalNode> getFoundNodes() {
+ return list;
+ }
+ }
+
+ public static Set<Column> collectColumnRefs(LogicalNode node) {
+ ColumnRefCollector collector = new ColumnRefCollector();
+ node.postOrder(collector);
+ return collector.getColumns();
+ }
+
+ private static class ColumnRefCollector implements LogicalNodeVisitor {
+ private Set<Column> collected = Sets.newHashSet();
+
+ public Set<Column> getColumns() {
+ return this.collected;
+ }
+
+ @Override
+ public void visit(LogicalNode node) {
+ Set<Column> temp;
+ switch (node.getType()) {
+ case PROJECTION:
+ ProjectionNode projNode = (ProjectionNode) node;
+
+ for (QueryBlock.Target t : projNode.getTargets()) {
+ temp = EvalTreeUtil.findDistinctRefColumns(t.getEvalTree());
+ if (!temp.isEmpty()) {
+ collected.addAll(temp);
+ }
+ }
+
+ break;
+
+ case SELECTION:
+ SelectionNode selNode = (SelectionNode) node;
+ temp = EvalTreeUtil.findDistinctRefColumns(selNode.getQual());
+ if (!temp.isEmpty()) {
+ collected.addAll(temp);
+ }
+
+ break;
+
+ case GROUP_BY:
+ GroupbyNode groupByNode = (GroupbyNode)node;
+ collected.addAll(Lists.newArrayList(groupByNode.getGroupingColumns()));
+ for (QueryBlock.Target t : groupByNode.getTargets()) {
+ temp = EvalTreeUtil.findDistinctRefColumns(t.getEvalTree());
+ if (!temp.isEmpty()) {
+ collected.addAll(temp);
+ }
+ }
+ if(groupByNode.hasHavingCondition()) {
+ temp = EvalTreeUtil.findDistinctRefColumns(groupByNode.
+ getHavingCondition());
+ if (!temp.isEmpty()) {
+ collected.addAll(temp);
+ }
+ }
+
+ break;
+
+ case SORT:
+ SortNode sortNode = (SortNode) node;
+ for (SortSpec key : sortNode.getSortKeys()) {
+ collected.add(key.getSortKey());
+ }
+
+ break;
+
+ case JOIN:
+ JoinNode joinNode = (JoinNode) node;
+ if (joinNode.hasJoinQual()) {
+ temp = EvalTreeUtil.findDistinctRefColumns(joinNode.getJoinQual());
+ if (!temp.isEmpty()) {
+ collected.addAll(temp);
+ }
+ }
+
+ break;
+
+ case SCAN:
+ ScanNode scanNode = (ScanNode) node;
+ if (scanNode.hasQual()) {
+ temp = EvalTreeUtil.findDistinctRefColumns(scanNode.getQual());
+ if (!temp.isEmpty()) {
+ collected.addAll(temp);
+ }
+ }
+
+ break;
+
+ default:
+ }
+ }
+ }
+
+ public static QueryBlock.Target[] schemaToTargets(Schema schema) {
+ QueryBlock.Target[] targets = new QueryBlock.Target[schema.getColumnNum()];
+
+ FieldEval eval;
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ eval = new FieldEval(schema.getColumn(i));
+ targets[i] = new QueryBlock.Target(eval, i);
+ }
+ return targets;
+ }
+
+ public static SortSpec[] schemaToSortSpecs(Schema schema) {
+ SortSpec[] specs = new SortSpec[schema.getColumnNum()];
+
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ specs[i] = new SortSpec(schema.getColumn(i), true, false);
+ }
+
+ return specs;
+ }
+
+ public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
+ Schema schema = new Schema();
+ for (SortSpec spec : sortSpecs) {
+ schema.addColumn(spec.getSortKey());
+ }
+
+ return schema;
+ }
+
+ /**
+ * is it join qual or not?
+ * TODO - this method does not support the self join (NTA-740)
+ * @param qual
+ * @return true if two operands refers to columns and the operator is comparison,
+ */
+ public static boolean isJoinQual(EvalNode qual) {
+ if (EvalTreeUtil.isComparisonOperator(qual)) {
+ List<Column> left = EvalTreeUtil.findAllColumnRefs(qual.getLeftExpr());
+ List<Column> right = EvalTreeUtil.findAllColumnRefs(qual.getRightExpr());
+
+ if (left.size() == 1 && right.size() == 1 &&
+ !left.get(0).getTableName().equals(right.get(0).getTableName()))
+ return true;
+ }
+
+ return false;
+ }
+
+ public static SortSpec[][] getSortKeysFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
+ List<Column []> joinKeyPairs = getJoinKeyPairs(joinQual, outer, inner);
+ SortSpec[] outerSortSpec = new SortSpec[joinKeyPairs.size()];
+ SortSpec[] innerSortSpec = new SortSpec[joinKeyPairs.size()];
+
+ for (int i = 0; i < joinKeyPairs.size(); i++) {
+ outerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[0]);
+ innerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[1]);
+ }
+
+ return new SortSpec[][] {outerSortSpec, innerSortSpec};
+ }
+
+ public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
+ SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, outer, inner);
+ TupleComparator [] comparators = new TupleComparator[2];
+ comparators[0] = new TupleComparator(outer, sortSpecs[0]);
+ comparators[1] = new TupleComparator(inner, sortSpecs[1]);
+ return comparators;
+ }
+
+ public static List<Column []> getJoinKeyPairs(EvalNode joinQual, Schema outer, Schema inner) {
+ JoinKeyPairFinder finder = new JoinKeyPairFinder(outer, inner);
+ joinQual.preOrder(finder);
+ return finder.getPairs();
+ }
+
+ public static class JoinKeyPairFinder implements EvalNodeVisitor {
+ private final List<Column []> pairs = Lists.newArrayList();
+ private Schema [] schemas = new Schema[2];
+
+ public JoinKeyPairFinder(Schema outer, Schema inner) {
+ schemas[0] = outer;
+ schemas[1] = inner;
+ }
+
+ @Override
+ public void visit(EvalNode node) {
+ if (EvalTreeUtil.isJoinQual(node)) {
+ Column [] pair = new Column[2];
+
+ for (int i = 0; i <= 1; i++) { // access left, right sub expression
+ Column column = EvalTreeUtil.findAllColumnRefs(node.getExpr(i)).get(0);
+ for (int j = 0; j < schemas.length; j++) {
+ // check whether the column is for either outer or inner
+ // 0 is outer, and 1 is inner
+ if (schemas[j].contains(column.getQualifiedName())) {
+ pair[j] = column;
+ }
+ }
+ }
+
+ if (pair[0] == null || pair[1] == null) {
+ throw new IllegalStateException("Wrong join key: " + node);
+ }
+ pairs.add(pair);
+ }
+ }
+
+ public List<Column []> getPairs() {
+ return this.pairs;
+ }
+ }
+
+ public static Schema targetToSchema(QueryBlock.Target[] targets) {
+ Schema schema = new Schema();
+ for(QueryBlock.Target t : targets) {
+ DataType type;
+ if (t.getEvalTree().getValueType().length > 1) {
+ type = CatalogUtil.newDataTypeWithoutLen(Type.ARRAY);
+ } else {
+ type = t.getEvalTree().getValueType()[0];
+ }
+ String name;
+ if (t.hasAlias()) {
+ name = t.getAlias();
+ } else {
+ name = t.getEvalTree().getName();
+ }
+ schema.addColumn(name, type);
+ }
+
+ return schema;
+ }
+
+ public static EvalNode [] columnsToEvals(Column [] columns) {
+ EvalNode [] exprs = new EvalNode[columns.length];
+ for (int i = 0; i < columns.length; i++) {
+ exprs[i] = new FieldEval(columns[i]);
+ }
+ return exprs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java
new file mode 100644
index 0000000..595f41a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.tajo.engine.parser.ParseTree;
+
+public interface PlanningContext {
+ String getRawQuery();
+
+ Tree getAST();
+
+ ParseTree getParseTree();
+
+ //TableMap getTableMap();
+
+ String getGeneratedColumnName();
+
+ String getGeneratedColumnName(String prefix);
+
+ String getExplicitOutputTable();
+
+ boolean hasExplicitOutputTable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java
new file mode 100644
index 0000000..34ea73a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.tajo.engine.parser.ParseTree;
+
+public class PlanningContextImpl implements PlanningContext {
+ private String rawQuery;
+ // private TableMap tableMap = new TableMap();
+ private ParseTree parseTree;
+ private Tree ast;
+ private String outputTableName;
+
+ private static final String DEFAULT_TABLE_GEN_PREFIX = "table";
+ private static final String DEFAULT_COLUMN_GEN_PREFIX = "column";
+ private static final String SEPARATOR = "_";
+
+ private int generatedTableId = 1;
+ private int generatedColumnId = 1;
+
+
+ public PlanningContextImpl(String rawQuery) {
+ this.rawQuery = rawQuery;
+ }
+
+ @Override
+ public String getRawQuery() {
+ return rawQuery;
+ }
+
+ @Override
+ public Tree getAST() {
+ return ast;
+ }
+
+ public void setAST(Tree ast) {
+ this.ast = ast;
+ }
+
+ @Override
+ public ParseTree getParseTree() {
+ return parseTree;
+ }
+
+// @Override
+// public TableMap getTableMap() {
+// return tableMap;
+// }
+
+ @Override
+ public String getGeneratedColumnName() {
+ return DEFAULT_COLUMN_GEN_PREFIX + SEPARATOR + (generatedColumnId++);
+ }
+
+ @Override
+ public synchronized String getGeneratedColumnName(String prefix) {
+ return prefix + SEPARATOR + (generatedColumnId++);
+ }
+
+ @Override
+ public String getExplicitOutputTable() {
+ return outputTableName;
+ }
+
+ @Override
+ public boolean hasExplicitOutputTable() {
+ return outputTableName != null;
+ }
+
+ public void setOutputTableName(String outputTableName) {
+ this.outputTableName = outputTableName;
+ }
+
+ public void setParseTree(ParseTree parseTree) {
+ this.parseTree = parseTree;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
new file mode 100644
index 0000000..da1b43c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.parser.QueryBlock.Target;
+import org.apache.tajo.storage.Tuple;
+
+public class Projector {
+ private final Schema inSchema;
+ private final Schema outSchema;
+
+ // for projection
+ private final int targetNum;
+ private final int [] inMap;
+ private final int [] outMap;
+ private int [] evalOutMap; // target list?
+ private EvalNode[] evals;
+ private Tuple prevTuple;
+
+ public Projector(Schema inSchema, Schema outSchema, Target [] targets) {
+ this.inSchema = inSchema;
+ this.outSchema = outSchema;
+
+ this.targetNum = targets != null ? targets.length : 0;
+
+ inMap = new int[outSchema.getColumnNum() - targetNum];
+ outMap = new int[outSchema.getColumnNum() - targetNum];
+ int mapId = 0;
+ Column col;
+
+ if (targetNum > 0) {
+ evalOutMap = new int[targetNum];
+ evals = new EvalNode[targetNum];
+ for (int i = 0; i < targetNum; i++) {
+ // TODO - is it always correct?
+ if (targets[i].hasAlias()) {
+ evalOutMap[i] = outSchema.getColumnId(targets[i].getAlias());
+ } else {
+ evalOutMap[i] = outSchema.getColumnId(targets[i].getEvalTree().getName());
+ }
+ evals[i] = targets[i].getEvalTree();
+ }
+
+ outer:
+ for (int targetId = 0; targetId < outSchema.getColumnNum(); targetId ++) {
+ for (int j = 0; j < evalOutMap.length; j++) {
+ if (evalOutMap[j] == targetId)
+ continue outer;
+ }
+
+ col = inSchema.getColumn(outSchema.getColumn(targetId).getQualifiedName());
+ outMap[mapId] = targetId;
+ inMap[mapId] = inSchema.getColumnId(col.getQualifiedName());
+ mapId++;
+ }
+ } else {
+ for (int targetId = 0; targetId < outSchema.getColumnNum(); targetId ++) {
+ col = inSchema.getColumn(outSchema.getColumn(targetId).getQualifiedName());
+ outMap[mapId] = targetId;
+ inMap[mapId] = inSchema.getColumnId(col.getQualifiedName());
+ mapId++;
+ }
+ }
+ }
+
+ public void eval(EvalContext[] evalContexts, Tuple in) {
+ this.prevTuple = in;
+ if (targetNum > 0) {
+ for (int i = 0; i < evals.length; i++) {
+ evals[i].eval(evalContexts[i], inSchema, in);
+ }
+ }
+ }
+
+ public void terminate(EvalContext [] evalContexts, Tuple out) {
+ for (int i = 0; i < inMap.length; i++) {
+ out.put(outMap[i], prevTuple.get(inMap[i]));
+ }
+ if (targetNum > 0) {
+ for (int i = 0; i < evals.length; i++) {
+ out.put(evalOutMap[i], evals[i].terminate(evalContexts[i]));
+ }
+ }
+ }
+
+ public EvalContext [] renew() {
+ EvalContext [] evalContexts = new EvalContext[targetNum];
+ for (int i = 0; i < targetNum; i++) {
+ evalContexts[i] = evals[i].newContext();
+ }
+
+ return evalContexts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
new file mode 100644
index 0000000..c18d403
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+
+public class RangeOverflowException extends RuntimeException {
+ public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc) {
+ super("Overflow Error: tried to increase " + inc + " to " + overflowValue + ", but the range " + range);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
new file mode 100644
index 0000000..e25903f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+
+import java.math.BigDecimal;
+
+public abstract class RangePartitionAlgorithm {
+ protected Schema schema;
+ protected TupleRange range;
+ protected final BigDecimal totalCard;
+ /** true if the end of the range is inclusive. Otherwise, it should be false. */
+ protected final boolean inclusive;
+
+ /**
+ *
+ * @param schema the schema of the range tuples
+ * @param range range to be partition
+ * @param inclusive true if the end of the range is inclusive. Otherwise, false.
+ */
+ public RangePartitionAlgorithm(Schema schema, TupleRange range, boolean inclusive) {
+ this.schema = schema;
+ this.range = range;
+ this.inclusive = inclusive;
+ this.totalCard = computeCardinalityForAllColumns(schema, range, inclusive);
+ }
+
+ /**
+ * It computes the value cardinality of a tuple range.
+ *
+ * @param dataType
+ * @param start
+ * @param end
+ * @return
+ */
+ public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
+ boolean inclusive) {
+ BigDecimal columnCard;
+
+ switch (dataType.getType()) {
+ case CHAR:
+ columnCard = new BigDecimal(end.asChar() - start.asChar());
+ break;
+ case BIT:
+ columnCard = new BigDecimal(end.asByte() - start.asByte());
+ break;
+ case INT2:
+ columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+ break;
+ case INT4:
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ break;
+ case INT8:
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ break;
+ case FLOAT4:
+ columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+ break;
+ case FLOAT8:
+ columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+ break;
+ case TEXT:
+ columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+ break;
+ default:
+ throw new UnsupportedOperationException(dataType + " is not supported yet");
+ }
+
+ return inclusive ? columnCard.add(new BigDecimal(1)) : columnCard;
+ }
+
+ /**
+ * It computes the value cardinality of a tuple range.
+ * @return
+ */
+ public static BigDecimal computeCardinalityForAllColumns(Schema schema, TupleRange range, boolean inclusive) {
+ Tuple start = range.getStart();
+ Tuple end = range.getEnd();
+ Column col;
+
+ BigDecimal cardinality = new BigDecimal(1);
+ BigDecimal columnCard;
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ col = schema.getColumn(i);
+ columnCard = computeCardinality(col.getDataType(), start.get(i), end.get(i), inclusive);
+
+ if (new BigDecimal(0).compareTo(columnCard) < 0) {
+ cardinality = cardinality.multiply(columnCard);
+ }
+ }
+
+ return cardinality;
+ }
+
+ public BigDecimal getTotalCardinality() {
+ return totalCard;
+ }
+
+ /**
+ *
+ * @param partNum the number of desired partitions, but it may return the less partitions.
+ * @return the end of intermediate ranges are exclusive, and the end of final range is inclusive.
+ */
+ public abstract TupleRange[] partition(int partNum);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
new file mode 100644
index 0000000..b52be45
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+
+public class UniformRangePartition extends RangePartitionAlgorithm {
+ private int variableId;
+ private BigDecimal[] cardForEachDigit;
+ private BigDecimal[] colCards;
+
+ /**
+ *
+ * @param schema
+ * @param range
+ * @param inclusive true if the end of the range is inclusive
+ */
+ public UniformRangePartition(Schema schema, TupleRange range, boolean inclusive) {
+ super(schema, range, inclusive);
+ colCards = new BigDecimal[schema.getColumnNum()];
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ colCards[i] = computeCardinality(schema.getColumn(i).getDataType(), range.getStart().get(i),
+ range.getEnd().get(i), inclusive);
+ }
+
+ cardForEachDigit = new BigDecimal[colCards.length];
+ for (int i = 0; i < colCards.length ; i++) {
+ if (i == 0) {
+ cardForEachDigit[i] = colCards[i];
+ } else {
+ cardForEachDigit[i] = cardForEachDigit[i - 1].multiply(colCards[i]);
+ }
+ }
+ }
+
+ public UniformRangePartition(Schema schema, TupleRange range) {
+ this(schema, range, true);
+ }
+
+ @Override
+ public TupleRange[] partition(int partNum) {
+ Preconditions.checkArgument(partNum > 0,
+ "The number of partitions must be positive, but the given number: "
+ + partNum);
+ Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
+ "the number of partition cannot exceed total cardinality (" + totalCard + ")");
+
+ int varId;
+ for (varId = 0; varId < cardForEachDigit.length; varId++) {
+ if (cardForEachDigit[varId].compareTo(new BigDecimal(partNum)) >= 0)
+ break;
+ }
+ this.variableId = varId;
+
+ BigDecimal [] reverseCardsForDigit = new BigDecimal[variableId+1];
+ for (int i = variableId; i >= 0; i--) {
+ if (i == variableId) {
+ reverseCardsForDigit[i] = colCards[i];
+ } else {
+ reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+ }
+ }
+
+ List<TupleRange> ranges = Lists.newArrayList();
+ BigDecimal term = reverseCardsForDigit[0].divide(
+ new BigDecimal(partNum), RoundingMode.CEILING);
+ BigDecimal reminder = reverseCardsForDigit[0];
+ Tuple last = range.getStart();
+ while(reminder.compareTo(new BigDecimal(0)) > 0) {
+ if (reminder.compareTo(term) <= 0) { // final one is inclusive
+ ranges.add(new TupleRange(schema, last, range.getEnd()));
+ } else {
+ Tuple next = increment(last, term.longValue(), variableId);
+ ranges.add(new TupleRange(schema, last, next));
+ }
+ last = ranges.get(ranges.size() - 1).getEnd();
+ reminder = reminder.subtract(term);
+ }
+
+ return ranges.toArray(new TupleRange[ranges.size()]);
+ }
+
+ public boolean isOverflow(int colId, Datum last, BigDecimal inc) {
+ Column column = schema.getColumn(colId);
+ BigDecimal candidate;
+ boolean overflow = false;
+ switch (column.getDataType().getType()) {
+ case BIT: {
+ candidate = inc.add(new BigDecimal(last.asByte()));
+ return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+ }
+ case CHAR: {
+ candidate = inc.add(new BigDecimal((int)last.asChar()));
+ return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+ }
+ case INT2: {
+ candidate = inc.add(new BigDecimal(last.asInt2()));
+ return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+ }
+ case INT4: {
+ candidate = inc.add(new BigDecimal(last.asInt4()));
+ return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+ }
+ case INT8: {
+ candidate = inc.add(new BigDecimal(last.asInt8()));
+ return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+ }
+ case FLOAT4: {
+ candidate = inc.add(new BigDecimal(last.asFloat4()));
+ return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+ }
+ case FLOAT8: {
+ candidate = inc.add(new BigDecimal(last.asFloat8()));
+ return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+ }
+ case TEXT: {
+ candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
+ return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+ }
+ }
+ return overflow;
+ }
+
+ public long incrementAndGetReminder(int colId, Datum last, long inc) {
+ Column column = schema.getColumn(colId);
+ long reminder = 0;
+ switch (column.getDataType().getType()) {
+ case BIT: {
+ long candidate = last.asByte() + inc;
+ byte end = range.getEnd().get(colId).asByte();
+ reminder = candidate - end;
+ break;
+ }
+ case CHAR: {
+ long candidate = last.asChar() + inc;
+ char end = range.getEnd().get(colId).asChar();
+ reminder = candidate - end;
+ break;
+ }
+ case INT4: {
+ int candidate = (int) (last.asInt4() + inc);
+ int end = range.getEnd().get(colId).asInt4();
+ reminder = candidate - end;
+ break;
+ }
+ case INT8: {
+ long candidate = last.asInt8() + inc;
+ long end = range.getEnd().get(colId).asInt8();
+ reminder = candidate - end;
+ break;
+ }
+ case FLOAT4: {
+ float candidate = last.asFloat4() + inc;
+ float end = range.getEnd().get(colId).asFloat4();
+ reminder = (long) (candidate - end);
+ break;
+ }
+ case FLOAT8: {
+ double candidate = last.asFloat8() + inc;
+ double end = range.getEnd().get(colId).asFloat8();
+ reminder = (long) Math.ceil(candidate - end);
+ break;
+ }
+ case TEXT: {
+ char candidate = ((char)(last.asChars().charAt(0) + inc));
+ char end = range.getEnd().get(colId).asChars().charAt(0);
+ reminder = (char) (candidate - end);
+ break;
+ }
+ }
+
+ // including zero
+ return reminder - 1;
+ }
+
+ /**
+ *
+ * @param last
+ * @param inc
+ * @return
+ */
+ public Tuple increment(final Tuple last, final long inc, final int baseDigit) {
+ BigDecimal [] incs = new BigDecimal[last.size()];
+ boolean [] overflowFlag = new boolean[last.size()];
+ BigDecimal [] result;
+ BigDecimal value = new BigDecimal(inc);
+
+ BigDecimal [] reverseCardsForDigit = new BigDecimal[baseDigit + 1];
+ for (int i = baseDigit; i >= 0; i--) {
+ if (i == baseDigit) {
+ reverseCardsForDigit[i] = colCards[i];
+ } else {
+ reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+ }
+ }
+
+ for (int i = 0; i < baseDigit; i++) {
+ result = value.divideAndRemainder(reverseCardsForDigit[i + 1]);
+ incs[i] = result[0];
+ value = result[1];
+ }
+ int finalId = baseDigit;
+ incs[finalId] = value;
+ for (int i = finalId; i >= 0; i--) {
+ if (isOverflow(i, last.get(i), incs[i])) {
+ if (i == 0) {
+ throw new RangeOverflowException(range, last, incs[i].longValue());
+ }
+ long rem = incrementAndGetReminder(i, last.get(i), value.longValue());
+ incs[i] = new BigDecimal(rem);
+ incs[i - 1] = incs[i-1].add(new BigDecimal(1));
+ overflowFlag[i] = true;
+ } else {
+ if (i > 0) {
+ incs[i] = value;
+ break;
+ }
+ }
+ }
+
+ for (int i = 0; i < incs.length; i++) {
+ if (incs[i] == null) {
+ incs[i] = new BigDecimal(0);
+ }
+ }
+
+ Tuple end = new VTuple(schema.getColumnNum());
+ Column column;
+ for (int i = 0; i < last.size(); i++) {
+ column = schema.getColumn(i);
+ switch (column.getDataType().getType()) {
+ case CHAR:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createChar((char) (range.getStart().get(i).asChar() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
+ }
+ break;
+ case BIT:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createBit(
+ (byte) (range.getStart().get(i).asByte() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
+ }
+ break;
+ case INT2:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createInt2(
+ (short) (range.getStart().get(i).asInt2() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
+ }
+ break;
+ case INT4:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createInt4(
+ (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+ } else {
+ end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+ }
+ break;
+ case INT8:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createInt8(
+ range.getStart().get(i).asInt4() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
+ }
+ break;
+ case FLOAT4:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createFloat4(
+ range.getStart().get(i).asFloat4() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
+ }
+ break;
+ case FLOAT8:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createFloat8(
+ range.getStart().get(i).asFloat8() + incs[i].longValue()));
+ } else {
+ end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
+ }
+ break;
+ case TEXT:
+ if (overflowFlag[i]) {
+ end.put(i, DatumFactory.createText(((char) (range.getStart().get(i).asChars().charAt(0)
+ + incs[i].longValue())) + ""));
+ } else {
+ end.put(i, DatumFactory.createText(
+ ((char) (last.get(i).asChars().charAt(0) + incs[i].longValue())) + ""));
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
+ }
+ }
+
+ return end;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
new file mode 100644
index 0000000..c0e4017
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.UnaryNode;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.ExecutionBlock.PartitionType;
+
+public class GlobalOptimizer {
+
+ public GlobalOptimizer() {
+
+ }
+
+ public MasterPlan optimize(MasterPlan plan) {
+ ExecutionBlock reducedStep = reduceSchedules(plan.getRoot());
+
+ MasterPlan optimized = new MasterPlan(reducedStep);
+ optimized.setOutputTableName(plan.getOutputTable());
+
+ return optimized;
+ }
+
+ @VisibleForTesting
+ private ExecutionBlock reduceSchedules(ExecutionBlock logicalUnit) {
+ reduceLogicalQueryUnitStep_(logicalUnit);
+ return logicalUnit;
+ }
+
+ private void reduceLogicalQueryUnitStep_(ExecutionBlock cur) {
+ if (cur.hasChildBlock()) {
+ for (ExecutionBlock childBlock: cur.getChildBlocks())
+ reduceLogicalQueryUnitStep_(childBlock);
+ }
+
+ for (ExecutionBlock childBlock: cur.getChildBlocks()) {
+ if (childBlock.getStoreTableNode().getSubNode().getType() != ExprType.UNION &&
+ childBlock.getPartitionType() == PartitionType.LIST) {
+ mergeLogicalUnits(cur, childBlock);
+ }
+ }
+ }
+
+ private ExecutionBlock mergeLogicalUnits(ExecutionBlock parent, ExecutionBlock child) {
+ LogicalNode p = PlannerUtil.findTopParentNode(parent.getPlan(), ExprType.SCAN);
+
+ if (p instanceof UnaryNode) {
+ UnaryNode u = (UnaryNode) p;
+ ScanNode scan = (ScanNode) u.getSubNode();
+ LogicalNode c = child.getStoreTableNode().getSubNode();
+
+ parent.removeChildBlock(scan);
+ u.setSubNode(c);
+ parent.setPlan(parent.getPlan());
+ parent.addChildBlocks(child.getChildBlockMap());
+ }
+ return parent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
new file mode 100644
index 0000000..9201997
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.master.ExecutionBlock;
+
+public class MasterPlan {
+ private ExecutionBlock root;
+ private String outputTableName;
+
+ public MasterPlan(ExecutionBlock root) {
+ setRoot(root);
+ }
+
+ public void setRoot(ExecutionBlock root) {
+ this.root = root;
+ }
+
+ public ExecutionBlock getRoot() {
+ return this.root;
+ }
+
+ public void setOutputTableName(String tableName) {
+ this.outputTableName = tableName;
+ }
+
+ public String getOutputTable() {
+ return outputTableName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
new file mode 100644
index 0000000..061964d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+
+public abstract class BinaryNode extends LogicalNode implements Cloneable {
+ @Expose
+ LogicalNode outer = null;
+ @Expose
+ LogicalNode inner = null;
+
+ public BinaryNode() {
+ super();
+ }
+
+ /**
+ * @param opType
+ */
+ public BinaryNode(ExprType opType) {
+ super(opType);
+ }
+
+ public LogicalNode getOuterNode() {
+ return this.outer;
+ }
+
+ public void setOuter(LogicalNode op) {
+ this.outer = op;
+ }
+
+ public LogicalNode getInnerNode() {
+ return this.inner;
+ }
+
+ public void setInner(LogicalNode op) {
+ this.inner = op;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ BinaryNode binNode = (BinaryNode) super.clone();
+ binNode.outer = (LogicalNode) outer.clone();
+ binNode.inner = (LogicalNode) inner.clone();
+
+ return binNode;
+ }
+
+ public void preOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ outer.postOrder(visitor);
+ inner.postOrder(visitor);
+ }
+
+ public void postOrder(LogicalNodeVisitor visitor) {
+ outer.postOrder(visitor);
+ inner.postOrder(visitor);
+ visitor.visit(this);
+ }
+
+ public String toJSON() {
+ outer.toJSON();
+ inner.toJSON();
+ return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
new file mode 100644
index 0000000..a02f703
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.util.TUtil;
+
+public class CreateTableNode extends LogicalNode implements Cloneable {
+ @Expose private String tableName;
+ @Expose private Column[] partitionKeys;
+ @Expose private StoreType storageType;
+ @Expose private Schema schema;
+ @Expose private Path path;
+ @Expose private Options options;
+
+ public CreateTableNode(String tableName, Schema schema) {
+ super(ExprType.CREATE_TABLE);
+ this.tableName = tableName;
+ this.schema = schema;
+ }
+
+ public final String getTableName() {
+ return this.tableName;
+ }
+
+ public Schema getSchema() {
+ return this.schema;
+ }
+
+ public void setStorageType(StoreType storageType) {
+ this.storageType = storageType;
+ }
+
+ public StoreType getStorageType() {
+ return this.storageType;
+ }
+
+ public boolean hasPath() {
+ return this.path != null;
+ }
+
+ public void setPath(Path path) {
+ this.path = path;
+ }
+
+ public Path getPath() {
+ return this.path;
+ }
+
+ public boolean hasOptions() {
+ return this.options != null;
+ }
+
+ public void setOptions(Options opt) {
+ this.options = opt;
+ }
+
+ public Options getOptions() {
+ return this.options;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof CreateTableNode) {
+ CreateTableNode other = (CreateTableNode) obj;
+ return super.equals(other)
+ && this.tableName.equals(other.tableName)
+ && this.schema.equals(other.schema)
+ && this.storageType == other.storageType
+ && this.path.equals(other.path)
+ && TUtil.checkEquals(options, other.options)
+ && TUtil.checkEquals(partitionKeys, other.partitionKeys);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ CreateTableNode store = (CreateTableNode) super.clone();
+ store.tableName = tableName;
+ store.schema = (Schema) schema.clone();
+ store.storageType = storageType;
+ store.path = new Path(path.toString());
+ store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
+ store.options = (Options) (options != null ? options.clone() : null);
+ return store;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\"Store\": {\"table\": \""+tableName+"\",");
+ if (partitionKeys != null) {
+ sb.append("\"partition keys: [");
+ for (int i = 0; i < partitionKeys.length; i++) {
+ sb.append(partitionKeys[i]);
+ if (i < partitionKeys.length - 1)
+ sb.append(",");
+ }
+ sb.append("],");
+ }
+ sb.append("\"schema: \"{" + this.schema).append("}");
+ sb.append(",\"storeType\": \"" + this.storageType);
+ sb.append(",\"path\" : \"" + this.path).append("\",");
+
+ sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
+ .append("\n \"in schema\": ").append(getInSchema())
+ .append("}");
+
+ return sb.toString();
+ }
+
+ public String toJSON() {
+ return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+ }
+
+ @Override
+ public void preOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ @Override
+ public void postOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
new file mode 100644
index 0000000..c44db13
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock;
+
+public class EvalExprNode extends LogicalNode {
+ @Expose private QueryBlock.Target[] exprs;
+
+ public EvalExprNode(QueryBlock.Target[] exprs) {
+ super(ExprType.EXPRS);
+ this.exprs = exprs;
+ }
+
+ @Override
+ public String toJSON() {
+ Gson gson = GsonCreator.getInstance();
+ return gson.toJson(this);
+ }
+
+ public QueryBlock.Target[] getExprs() {
+ return this.exprs;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\"EvalExpr\": {");
+ sb.append("\"targets\": [");
+
+ for (int i = 0; i < exprs.length; i++) {
+ sb.append("\"").append(exprs[i]).append("\"");
+ if( i < exprs.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("],");
+ sb.append("\n \"out schema\": ").append(getOutSchema()).append(",");
+ sb.append("\n \"in schema\": ").append(getInSchema());
+ sb.append("}");
+ return sb.toString();
+ }
+
+ @Override
+ public void preOrder(LogicalNodeVisitor visitor) {
+ // nothing
+ }
+
+ @Override
+ public void postOrder(LogicalNodeVisitor visitor) {
+ // nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
new file mode 100644
index 0000000..41e606f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.json.GsonCreator;
+
+public class ExceptNode extends BinaryNode {
+
+ public ExceptNode() {
+ super(ExprType.EXCEPT);
+ }
+
+ public ExceptNode(LogicalNode outer, LogicalNode inner) {
+ this();
+ setOuter(outer);
+ setInner(inner);
+ }
+
+ public String toString() {
+ return getOuterNode().toString() + "\n EXCEPT \n" + getInnerNode().toString();
+ }
+
+ @Override
+ public String toJSON() {
+ return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java
new file mode 100644
index 0000000..8eb70cf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.logical;
+
+public enum ExprType {
+ BST_INDEX_SCAN,
+ CREATE_INDEX,
+ CREATE_TABLE,
+ DESC_TABLE,
+ EXCEPT,
+ EXPRS,
+ GROUP_BY,
+ INSERT_INTO,
+ INTERSECT,
+ LIMIT,
+ JOIN,
+ PROJECTION,
+ RECEIVE,
+ RENAME,
+ ROOT,
+ SCAN,
+ SELECTION,
+ SEND,
+ SET_DIFF,
+ SET_UNION,
+ SET_INTERSECT,
+ SHOW_TABLE,
+ SHOW_FUNCTION,
+ SORT,
+ STORE,
+ UNION
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
new file mode 100644
index 0000000..d35db32
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Arrays;
+
+public class GroupbyNode extends UnaryNode implements Cloneable {
+ @Expose
+ private Column[] columns;
+ @Expose
+ private EvalNode havingCondition = null;
+ @Expose
+ private QueryBlock.Target[] targets;
+
+ public GroupbyNode() {
+ super();
+ }
+
+ public GroupbyNode(final Column [] columns) {
+ super(ExprType.GROUP_BY);
+ this.columns = columns;
+ }
+
+ public GroupbyNode(final Column [] columns,
+ final EvalNode havingCondition) {
+ this(columns);
+ this.havingCondition = havingCondition;
+ }
+
+ public final Column [] getGroupingColumns() {
+ return this.columns;
+ }
+
+ public final boolean hasHavingCondition() {
+ return this.havingCondition != null;
+ }
+
+ public final EvalNode getHavingCondition() {
+ return this.havingCondition;
+ }
+
+ public final void setHavingCondition(final EvalNode evalTree) {
+ this.havingCondition = evalTree;
+ }
+
+ public boolean hasTargetList() {
+ return this.targets != null;
+ }
+
+ public QueryBlock.Target[] getTargets() {
+ return this.targets;
+ }
+
+ public void setTargetList(QueryBlock.Target[] targets) {
+ this.targets = targets;
+ }
+
+ public void setSubNode(LogicalNode subNode) {
+ super.setSubNode(subNode);
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("\"GroupBy\": {\"fields\":[");
+ for (int i=0; i < columns.length; i++) {
+ sb.append("\"").append(columns[i]).append("\"");
+ if(i < columns.length - 1)
+ sb.append(",");
+ }
+
+ if(hasHavingCondition()) {
+ sb.append("], \"having qual\": \""+havingCondition+"\"");
+ }
+ if(hasTargetList()) {
+ sb.append(", \"target\": [");
+ for (int i = 0; i < targets.length; i++) {
+ sb.append("\"").append(targets[i]).append("\"");
+ if( i < targets.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("],");
+ }
+ sb.append("\n \"out schema\": ").append(getOutSchema()).append(",");
+ sb.append("\n \"in schema\": ").append(getInSchema());
+ sb.append("}");
+
+ return sb.toString() + "\n"
+ + getSubNode().toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof GroupbyNode) {
+ GroupbyNode other = (GroupbyNode) obj;
+ return super.equals(other)
+ && Arrays.equals(columns, other.columns)
+ && TUtil.checkEquals(havingCondition, other.havingCondition)
+ && TUtil.checkEquals(targets, other.targets)
+ && getSubNode().equals(other.getSubNode());
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ GroupbyNode grp = (GroupbyNode) super.clone();
+ if (columns != null) {
+ grp.columns = new Column[columns.length];
+ for (int i = 0; i < columns.length; i++) {
+ grp.columns[i] = (Column) columns[i].clone();
+ }
+ }
+ grp.havingCondition = (EvalNode) (havingCondition != null
+ ? havingCondition.clone() : null);
+ if (targets != null) {
+ grp.targets = new QueryBlock.Target[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ grp.targets[i] = (QueryBlock.Target) targets[i].clone();
+ }
+ }
+
+ return grp;
+ }
+
+ public String toJSON() {
+ return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+ }
+}