You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/16 13:33:31 UTC
[4/7] TAJO-184: Refactor GlobalPlanner and global plan data
structure. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 7c374d2..d5e51ef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -18,681 +18,432 @@
package org.apache.tajo.master;
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.DataChannel;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.FromTable;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
-import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
public class GlobalPlanner {
private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
private TajoConf conf;
private AbstractStorageManager sm;
- private QueryId queryId;
- public GlobalPlanner(final TajoConf conf,
- final AbstractStorageManager sm,
- final EventHandler eventHandler)
+ public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
throws IOException {
this.conf = conf;
this.sm = sm;
}
+ public class GlobalPlanContext {
+ MasterPlan plan;
+ Set<String> broadcastTables = new HashSet<String>();
+ LogicalNode topmost;
+ LogicalNode lastRepartionableNode;
+ ExecutionBlock topMostLeftExecBlock;
+ ExecutionBlock topMostRightExecBlock;
+ }
+
/**
* Builds a master plan from the given logical plan.
- * @param queryId
- * @param rootNode
- * @return
- * @throws IOException
*/
- public MasterPlan build(QueryId queryId, LogicalRootNode rootNode)
- throws IOException {
- this.queryId = queryId;
-
- String outputTableName = null;
- if (rootNode.getChild().getType() == NodeType.STORE) {
- // create table queries are executed by the master
- StoreTableNode storeTableNode = (StoreTableNode) rootNode.getChild();
- outputTableName = storeTableNode.getTableName();
+ public void build(MasterPlan masterPlan)
+ throws IOException, PlanningException {
+
+ NewPlanner planner = new NewPlanner();
+ GlobalPlanContext globalPlanContext = new GlobalPlanContext();
+ globalPlanContext.plan = masterPlan;
+ LOG.info(masterPlan.getLogicalPlan());
+
+ LogicalNode rootNode = PlannerUtil.clone(masterPlan.getLogicalPlan().getRootBlock().getRoot());
+ planner.visitChild(masterPlan.getLogicalPlan(), rootNode, new Stack<LogicalNode>(), globalPlanContext);
+
+ ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
+
+ if (globalPlanContext.lastRepartionableNode != null
+ && globalPlanContext.lastRepartionableNode.getType() == NodeType.UNION) {
+ UnionNode unionNode = (UnionNode) globalPlanContext.lastRepartionableNode;
+ ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+ UnionsFinderContext finderContext = new UnionsFinderContext();
+ finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+ for (UnionNode union : finderContext.unionList) {
+ TableSubQueryNode leftSubQuery = union.getLeftChild();
+ TableSubQueryNode rightSubQuery = union.getRightChild();
+ if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ execBlock.setPlan(leftSubQuery);
+ DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+ masterPlan.addConnect(dataChannel);
+ }
+ if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ execBlock.setPlan(rightSubQuery);
+ DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+ masterPlan.addConnect(dataChannel);
+ }
+ }
+ } else {
+ DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, NONE_PARTITION, 1);
+ dataChannel.setSchema(globalPlanContext.topmost.getOutSchema());
+ masterPlan.addConnect(dataChannel);
}
+ masterPlan.setTerminal(terminalBlock);
+ LOG.info(masterPlan);
+ }
- // insert store at the subnode of the root
- UnaryNode root = rootNode;
- if (root.getChild().getType() != NodeType.STORE) {
- ExecutionBlockId executionBlockId = QueryIdFactory.newExecutionBlockId(this.queryId);
- outputTableName = executionBlockId.toString();
- insertStore(executionBlockId.toString(),root).setLocal(false);
+ public static ScanNode buildInputExecutor(DataChannel channel) {
+ Preconditions.checkArgument(channel.getSchema() != null,
+ "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
+ TableMeta meta = new TableMetaImpl(channel.getSchema(), channel.getStoreType(), new Options());
+ TableDesc desc = new TableDescImpl(channel.getSrcId().toString(), meta, new Path("/"));
+ return new ScanNode(new FromTable(desc));
+ }
+
+ public class NewPlanner extends BasicLogicalPlanVisitor<GlobalPlanContext> {
+
+ @Override
+ public LogicalNode visitRoot(LogicalPlan plan, LogicalRootNode node, Stack<LogicalNode> stack,
+ GlobalPlanContext data) throws PlanningException {
+ super.visitRoot(plan, node, stack, data);
+
+ if (data.lastRepartionableNode != null && data.lastRepartionableNode.getType() != NodeType.UNION) {
+ data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
+ } else if (data.lastRepartionableNode != null && data.lastRepartionableNode.getType() == NodeType.UNION) {
+
+ } else {
+ ExecutionBlock execBlock = data.plan.newExecutionBlock();
+ execBlock.setPlan(node);
+ data.topMostLeftExecBlock = execBlock;
+ }
+
+ data.topmost = node;
+ return node;
}
-
- // convert 2-phase plan
- LogicalNode twoPhased = convertTo2Phase(rootNode);
- // make query graph
- MasterPlan globalPlan = convertToGlobalPlan(twoPhased);
- globalPlan.setOutputTableName(outputTableName);
+ @Override
+ public LogicalNode visitProjection(LogicalPlan plan, ProjectionNode node, Stack<LogicalNode> stack,
+ GlobalPlanContext data) throws PlanningException {
+ super.visitProjection(plan, node, stack, data);
+ data.topmost = node;
+ return node;
+ }
- return globalPlan;
- }
-
- private StoreTableNode insertStore(String tableId, LogicalNode parent) {
- StoreTableNode store = new StoreTableNode(tableId);
- store.setLocal(true);
- PlannerUtil.insertNode(parent, store);
- return store;
- }
-
- /**
- * Transforms a logical plan to a two-phase plan.
- * Store nodes are inserted for every logical nodes except store and scan nodes
- */
- private class GlobalPlanBuilder implements LogicalNodeVisitor {
@Override
- public void visit(LogicalNode node) {
- String tableId;
- StoreTableNode store;
- if (node.getType() == NodeType.GROUP_BY) {
- // transform group by to two-phase plan
- GroupbyNode groupby = (GroupbyNode) node;
- // insert a store for the child of first group by
- if (groupby.getChild().getType() != NodeType.UNION &&
- groupby.getChild().getType() != NodeType.STORE &&
- groupby.getChild().getType() != NodeType.SCAN) {
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- insertStore(tableId, groupby);
- }
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- // insert (a store for the first group by) and (a second group by)
- PlannerUtil.transformGroupbyTo2PWithStore((GroupbyNode)node, tableId);
- } else if (node.getType() == NodeType.SORT) {
- // transform sort to two-phase plan
- SortNode sort = (SortNode) node;
- // insert a store for the child of first sort
- if (sort.getChild().getType() != NodeType.UNION &&
- sort.getChild().getType() != NodeType.STORE &&
- sort.getChild().getType() != NodeType.SCAN) {
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- insertStore(tableId, sort);
- }
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- // insert (a store for the first sort) and (a second sort)
- PlannerUtil.transformSortTo2PWithStore((SortNode)node, tableId);
- } else if (node.getType() == NodeType.JOIN) {
- // transform join to two-phase plan
- // the first phase of two-phase join can be any logical nodes
- JoinNode join = (JoinNode) node;
-
- if (join.getRightChild().getType() == NodeType.SCAN &&
- join.getLeftChild().getType() == NodeType.SCAN) {
- ScanNode outerScan = (ScanNode) join.getRightChild();
- ScanNode innerScan = (ScanNode) join.getLeftChild();
-
-
- TableMeta outerMeta = outerScan.getFromTable().getTableDesc().getMeta();
- TableMeta innerMeta = innerScan.getFromTable().getTableDesc().getMeta();
- long threshold = conf.getLongVar(ConfVars.BROADCAST_JOIN_THRESHOLD);
-
-
- // if the broadcast join is available
- boolean outerSmall = false;
- boolean innerSmall = false;
- if (!outerScan.isLocal() && outerMeta.getStat() != null &&
- outerMeta.getStat().getNumBytes() <= threshold) {
- outerSmall = true;
- LOG.info("The relation (" + outerScan.getTableId() +
- ") is less than " + threshold);
+ public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+ super.visitLimit(plan, node, stack, data);
+ data.topmost = node;
+ return node;
+ }
+
+ private ExecutionBlock addChannel(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
+ LogicalNode childNode, ExecutionBlock lastChildBlock) throws PlanningException {
+ ExecutionBlock currentBlock = null;
+ ExecutionBlock childBlock;
+
+ childBlock = lastChildBlock;
+
+ NodeType shuffleRequiredNodeType = lastDistNode.getType();
+ if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
+ GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
+
+ GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+
+ if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+ ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+ UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+ ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+ UnionsFinderContext finderContext = new UnionsFinderContext();
+ finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+ currentBlock = masterPlan.newExecutionBlock();
+ GroupbyNode secondGroupBy = groupByNode;
+ for (UnionNode union : finderContext.unionList) {
+ TableSubQueryNode leftSubQuery = union.getLeftChild();
+ TableSubQueryNode rightSubQuery = union.getRightChild();
+ DataChannel dataChannel;
+ if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+ g1.setChild(leftSubQuery);
+ execBlock.setPlan(g1);
+ dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+ ScanNode scanNode = buildInputExecutor(dataChannel);
+ secondGroupBy.setChild(scanNode);
+ masterPlan.addConnect(dataChannel);
+ }
+ if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+ g1.setChild(rightSubQuery);
+ execBlock.setPlan(g1);
+ dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+ ScanNode scanNode = buildInputExecutor(dataChannel);
+ secondGroupBy.setChild(scanNode);
+ masterPlan.addConnect(dataChannel);
+ }
}
- if (!innerScan.isLocal() && innerMeta.getStat() != null &&
- innerMeta.getStat().getNumBytes() <= threshold) {
- innerSmall = true;
- LOG.info("The relation (" + innerScan.getTableId() +
- ") is less than " + threshold);
+ LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
}
+ currentBlock.setPlan(curNode);
+ } else {
- if (outerSmall && innerSmall) {
- if (outerMeta.getStat().getNumBytes() <=
- innerMeta.getStat().getNumBytes()) {
- outerScan.setBroadcast();
- LOG.info("The relation " + outerScan.getTableId()
- + " is broadcasted");
- } else {
- innerScan.setBroadcast();
- LOG.info("The relation " + innerScan.getTableId()
- + " is broadcasted");
- }
+ if (childBlock == null) { // first repartition node
+ childBlock = masterPlan.newExecutionBlock();
+ }
+ childBlock.setPlan(firstGroupBy);
+
+ currentBlock = masterPlan.newExecutionBlock();
+
+ DataChannel channel;
+ if (firstGroupBy.isEmptyGrouping()) {
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+ channel.setPartitionKey(firstGroupBy.getGroupingColumns());
} else {
- if (outerSmall) {
- outerScan.setBroadcast();
- LOG.info("The relation (" + outerScan.getTableId()
- + ") is broadcasted");
- } else if (innerSmall) {
- innerScan.setBroadcast();
- LOG.info("The relation (" + innerScan.getTableId()
- + ") is broadcasted");
- }
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel.setPartitionKey(firstGroupBy.getGroupingColumns());
}
+ channel.setSchema(firstGroupBy.getOutSchema());
- if (outerScan.isBroadcast() || innerScan.isBroadcast()) {
- return;
+ GroupbyNode secondGroupBy = groupByNode;
+ ScanNode scanNode = buildInputExecutor(channel);
+ secondGroupBy.setChild(scanNode);
+
+ LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
}
- }
- // insert stores for the first phase
- if (join.getLeftChild().getType() != NodeType.UNION &&
- join.getLeftChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- store = new StoreTableNode(tableId);
- store.setLocal(true);
- PlannerUtil.insertOuterNode(node, store);
+ masterPlan.addConnect(channel);
+ currentBlock.setPlan(curNode);
}
- if (join.getRightChild().getType() != NodeType.UNION &&
- join.getRightChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- store = new StoreTableNode(tableId);
- store.setLocal(true);
- PlannerUtil.insertInnerNode(node, store);
+ } else if (shuffleRequiredNodeType == NodeType.SORT) {
+ SortNode firstSort = (SortNode) lastDistNode;
+ if (childBlock == null) {
+ childBlock = masterPlan.newExecutionBlock();
}
- } else if (node.getType() == NodeType.UNION) {
- // not two-phase transform
- UnionNode union = (UnionNode) node;
- // insert stores
- if (union.getLeftChild().getType() != NodeType.UNION &&
- union.getLeftChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- store = new StoreTableNode(tableId);
- if(union.getLeftChild().getType() == NodeType.GROUP_BY) {
- /*This case is for cube by operator
- * TODO : more complicated conidtion*/
- store.setLocal(true);
+ childBlock.setPlan(firstSort);
+
+ currentBlock = masterPlan.newExecutionBlock();
+ DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+ channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
+ channel.setSchema(childNode.getOutSchema());
+
+ SortNode secondSort = PlannerUtil.clone(lastDistNode);
+ ScanNode secondScan = buildInputExecutor(channel);
+ secondSort.setChild(secondScan);
+
+ LimitNode limitAndSort;
+ LimitNode limitOrNull = PlannerUtil.findTopNode(curNode, NodeType.LIMIT);
+ if (limitOrNull != null) {
+ limitAndSort = PlannerUtil.clone(limitOrNull);
+ limitAndSort.setChild(firstSort);
+
+ if (childBlock.getPlan().getType() == NodeType.SORT) {
+ childBlock.setPlan(limitAndSort);
} else {
- /* This case is for union query*/
- store.setLocal(false);
+ LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
+ if (sortParent != null) {
+ if (sortParent instanceof UnaryNode) {
+ ((UnaryNode)sortParent).setChild(limitAndSort);
+ }
+ }
}
- PlannerUtil.insertOuterNode(node, store);
}
- if (union.getRightChild().getType() != NodeType.UNION &&
- union.getRightChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- store = new StoreTableNode(tableId);
- if(union.getRightChild().getType() == NodeType.GROUP_BY) {
- /*This case is for cube by operator
- * TODO : more complicated conidtion*/
- store.setLocal(true);
- }else {
- /* This case is for union query*/
- store.setLocal(false);
- }
- PlannerUtil.insertInnerNode(node, store);
+
+ LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondSort) {
+ ((UnaryNode)parent).setChild(secondSort);
}
- } else if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode)node;
- if (unary.getType() != NodeType.STORE &&
- unary.getChild().getType() != NodeType.STORE) {
- tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
- insertStore(tableId, unary);
+
+ masterPlan.addConnect(channel);
+ currentBlock.setPlan(curNode);
+ } else if (shuffleRequiredNodeType == NodeType.JOIN) {
+ JoinNode joinNode = (JoinNode) lastDistNode;
+ LogicalNode leftNode = joinNode.getLeftChild();
+ LogicalNode rightNode = joinNode.getRightChild();
+
+ ExecutionBlock leftBlock = null;
+ if (lastChildBlock == null) {
+ leftBlock = masterPlan.newExecutionBlock();
+ leftBlock.setPlan(leftNode);
+ } else {
+ leftBlock = lastChildBlock;
}
+ ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
+ rightBlock.setPlan(rightNode);
+
+ currentBlock = masterPlan.newExecutionBlock();
+
+ DataChannel leftChannel = new DataChannel(leftBlock, currentBlock, HASH_PARTITION, 32);
+ DataChannel rightChannel = new DataChannel(rightBlock, currentBlock, HASH_PARTITION, 32);
+
+ ScanNode leftScan = buildInputExecutor(leftChannel);
+ ScanNode rightScan = buildInputExecutor(rightChannel);
+
+ joinNode.setLeftChild(leftScan);
+ joinNode.setRightChild(rightScan);
+ currentBlock.setPlan(joinNode);
+
+ masterPlan.addConnect(leftChannel);
+ masterPlan.addConnect(rightChannel);
}
+
+ return currentBlock;
}
- }
- /**
- * Convert the logical plan to a two-phase plan by the post-order traverse.
- *
- * @param logicalPlan
- * @return
- */
- private LogicalNode convertTo2Phase(LogicalNode logicalPlan) {
- LogicalRootNode root = (LogicalRootNode) logicalPlan;
- root.postOrder(new GlobalPlanBuilder());
- return logicalPlan;
- }
-
- private Map<StoreTableNode, ExecutionBlock> convertMap =
- new HashMap<StoreTableNode, ExecutionBlock>();
-
- /**
- * Logical plan을 후위 탐색하면서 SubQuery 생성
- *
- * @param node 현재 방문 중인 노드
- * @throws IOException
- */
- private void recursiveBuildSubQuery(LogicalNode node)
- throws IOException {
- ExecutionBlock subQuery;
- StoreTableNode store;
- if (node instanceof UnaryNode) {
- recursiveBuildSubQuery(((UnaryNode) node).getChild());
-
- if (node.getType() == NodeType.STORE) {
- store = (StoreTableNode) node;
- ExecutionBlockId id;
- if (store.getTableName().startsWith(ExecutionBlockId.EB_ID_PREFIX)) {
- id = TajoIdUtils.createExecutionBlockId(store.getTableName());
- } else {
- id = QueryIdFactory.newExecutionBlockId(queryId);
- }
- subQuery = new ExecutionBlock(id);
-
- switch (store.getChild().getType()) {
- case BST_INDEX_SCAN:
- case SCAN: // store - scan
- subQuery = makeScanSubQuery(subQuery);
- subQuery.setPlan(node);
- break;
- case SELECTION:
- case PROJECTION:
- case LIMIT:
- subQuery = makeUnarySubQuery(store, node, subQuery);
- subQuery.setPlan(node);
- break;
- case GROUP_BY:
- subQuery = makeGroupbySubQuery(store, node, subQuery);
- subQuery.setPlan(node);
- break;
- case SORT:
- subQuery = makeSortSubQuery(store, node, subQuery);
- subQuery.setPlan(node);
- break;
- case JOIN: // store - join
- subQuery = makeJoinSubQuery(store, node, subQuery);
- subQuery.setPlan(node);
- break;
- case UNION:
- subQuery = makeUnionSubQuery(store, node, subQuery);
- subQuery.setPlan(node);
- break;
- default:
- subQuery = null;
- break;
- }
+ @Override
+ public LogicalNode visitSort(LogicalPlan plan, SortNode node, Stack<LogicalNode> stack, GlobalPlanContext data)
+ throws PlanningException {
- convertMap.put(store, subQuery);
+ super.visitSort(plan, node, stack, data);
+
+ if (data.lastRepartionableNode != null) {
+ data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
}
- } else if (node instanceof BinaryNode) {
- recursiveBuildSubQuery(((BinaryNode) node).getLeftChild());
- recursiveBuildSubQuery(((BinaryNode) node).getRightChild());
- } else if (node instanceof ScanNode) {
- } else {
+ data.topmost = node;
+ data.lastRepartionableNode = node;
- }
- }
-
- private ExecutionBlock makeScanSubQuery(ExecutionBlock block) {
- block.setPartitionType(PartitionType.LIST);
- return block;
- }
-
- /**
- * Unifiable node(selection, projection)을 자식 플랜과 같은 SubQuery로 생성
- *
- * @param rootStore 생성할 SubQuery의 store
- * @param plan logical plan
- * @param unit 생성할 SubQuery
- * @return
- * @throws IOException
- */
- private ExecutionBlock makeUnarySubQuery(StoreTableNode rootStore,
- LogicalNode plan, ExecutionBlock unit) throws IOException {
- ScanNode newScan;
- ExecutionBlock prev;
- UnaryNode unary = (UnaryNode) plan;
- UnaryNode child = (UnaryNode) unary.getChild();
- StoreTableNode prevStore = (StoreTableNode)child.getChild();
-
- // add scan
- newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
- prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
- newScan.setLocal(true);
- child.setChild(newScan);
- prev = convertMap.get(prevStore);
-
- if (prev != null) {
- prev.setParentBlock(unit);
- unit.addChildBlock(newScan, prev);
- prev.setPartitionType(PartitionType.LIST);
+ return node;
}
- unit.setPartitionType(PartitionType.LIST);
+ @Override
+ public LogicalNode visitGroupBy(LogicalPlan plan, GroupbyNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+ super.visitGroupBy(plan, node, stack, data);
- return unit;
- }
-
- /**
- * Two-phase SubQuery 생성.
- *
- * @param rootStore 생성할 SubQuery의 store
- * @param plan logical plan
- * @param unit 생성할 SubQuery
- * @return
- * @throws IOException
- */
- private ExecutionBlock makeGroupbySubQuery(StoreTableNode rootStore,
- LogicalNode plan, ExecutionBlock unit) throws IOException {
- UnaryNode unary = (UnaryNode) plan;
- UnaryNode unaryChild;
- StoreTableNode prevStore;
- ScanNode newScan;
- ExecutionBlock prev;
- unaryChild = (UnaryNode) unary.getChild(); // groupby
- NodeType curType = unaryChild.getType();
- if (unaryChild.getChild().getType() == NodeType.STORE) {
- // store - groupby - store
- unaryChild = (UnaryNode) unaryChild.getChild(); // store
- prevStore = (StoreTableNode) unaryChild;
- newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
- prevStore.getTableName(),
- sm.getTablePath(prevStore.getTableName()));
- newScan.setLocal(true);
- ((UnaryNode) unary.getChild()).setChild(newScan);
- prev = convertMap.get(prevStore);
- if (prev != null) {
- prev.setParentBlock(unit);
- unit.addChildBlock(newScan, prev);
+ if (data.lastRepartionableNode != null) {
+ data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
}
- if (unaryChild.getChild().getType() == curType) {
- // the second phase
- unit.setPartitionType(PartitionType.LIST);
- if (prev != null) {
- prev.setPartitionType(PartitionType.HASH);
- }
- } else {
- // the first phase
- unit.setPartitionType(PartitionType.HASH);
- if (prev != null) {
- prev.setPartitionType(PartitionType.LIST);
- }
- }
- } else if (unaryChild.getChild().getType() == NodeType.SCAN) {
- // the first phase
- // store - groupby - scan
- unit.setPartitionType(PartitionType.HASH);
- } else if (unaryChild.getChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, (UnionNode)unaryChild.getChild(), unit,
- null, PartitionType.LIST);
- } else {
- // error
+ data.topmost = node;
+ data.lastRepartionableNode = node;
+ return node;
}
- return unit;
- }
-
- /**
- *
- *
- * @param rootStore 생성할 SubQuery의 store
- * @param plan logical plan
- * @param unit 생성할 SubQuery
- * @return
- * @throws IOException
- */
- private ExecutionBlock makeUnionSubQuery(StoreTableNode rootStore,
- LogicalNode plan, ExecutionBlock unit) throws IOException {
- UnaryNode unary = (UnaryNode) plan;
- StoreTableNode outerStore, innerStore;
- ExecutionBlock prev;
- UnionNode union = (UnionNode) unary.getChild();
- unit.setPartitionType(PartitionType.LIST);
-
- if (union.getLeftChild().getType() == NodeType.STORE) {
- outerStore = (StoreTableNode) union.getLeftChild();
- TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
- StoreType.CSV);
- insertOuterScan(union, outerStore.getTableName(), outerMeta);
- prev = convertMap.get(outerStore);
- if (prev != null) {
- prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setPartitionType(PartitionType.LIST);
- prev.setParentBlock(unit);
- unit.addChildBlock((ScanNode) union.getLeftChild(), prev);
- }
- } else if (union.getLeftChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
+
+ @Override
+ public LogicalNode visitFilter(LogicalPlan plan, SelectionNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+ super.visitFilter(plan, node, stack, data);
+ data.topmost = node;
+ return node;
}
-
- if (union.getRightChild().getType() == NodeType.STORE) {
- innerStore = (StoreTableNode) union.getRightChild();
- TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
- StoreType.CSV);
- insertInnerScan(union, innerStore.getTableName(), innerMeta);
- prev = convertMap.get(innerStore);
- if (prev != null) {
- prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setPartitionType(PartitionType.LIST);
- prev.setParentBlock(unit);
- unit.addChildBlock((ScanNode) union.getRightChild(), prev);
+
+ @Override
+ public LogicalNode visitJoin(LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+ super.visitJoin(plan, node, stack, data);
+
+ if (data.lastRepartionableNode != null) {
+ data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
}
- } else if (union.getRightChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
+
+ data.topmost = node;
+ data.lastRepartionableNode = node;
+
+ return node;
}
- return unit;
- }
+ @Override
+ public LogicalNode visitUnion(LogicalPlan plan, UnionNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+ super.visitUnion(plan, node, stack, data);
- private ExecutionBlock makeSortSubQuery(StoreTableNode rootStore,
- LogicalNode plan, ExecutionBlock unit) throws IOException {
-
- UnaryNode unary = (UnaryNode) plan;
- UnaryNode unaryChild;
- StoreTableNode prevStore;
- ScanNode newScan;
- ExecutionBlock prev;
- unaryChild = (UnaryNode) unary.getChild(); // groupby
- NodeType curType = unaryChild.getType();
- if (unaryChild.getChild().getType() == NodeType.STORE) {
- // store - groupby - store
- unaryChild = (UnaryNode) unaryChild.getChild(); // store
- prevStore = (StoreTableNode) unaryChild;
- newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
- prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
- newScan.setLocal(true);
- ((UnaryNode) unary.getChild()).setChild(newScan);
- prev = convertMap.get(prevStore);
- if (prev != null) {
- prev.setParentBlock(unit);
- unit.addChildBlock(newScan, prev);
- if (unaryChild.getChild().getType() == curType) {
- // TODO - this is duplicated code
- prev.setPartitionType(PartitionType.RANGE);
- } else {
- prev.setPartitionType(PartitionType.LIST);
- }
+ if (data.lastRepartionableNode != null && data.lastRepartionableNode.getType() != NodeType.UNION) {
+ data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost,
+ data.topMostLeftExecBlock);
}
- if (unaryChild.getChild().getType() == curType) {
- // the second phase
- unit.setPartitionType(PartitionType.LIST);
- } else {
- // the first phase
- unit.setPartitionType(PartitionType.HASH);
- }
- } else if (unaryChild.getChild().getType() == NodeType.SCAN) {
- // the first phase
- // store - sort - scan
- unit.setPartitionType(PartitionType.RANGE);
- } else if (unaryChild.getChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, (UnionNode)unaryChild.getChild(), unit,
- null, PartitionType.LIST);
- } else {
- // error
+
+ data.topmost = node;
+ data.lastRepartionableNode = node;
+ return node;
}
- return unit;
- }
-
- private ExecutionBlock makeJoinSubQuery(StoreTableNode rootStore,
- LogicalNode plan, ExecutionBlock unit) throws IOException {
- UnaryNode unary = (UnaryNode)plan;
- StoreTableNode outerStore, innerStore;
- ExecutionBlock prev;
- JoinNode join = (JoinNode) unary.getChild();
- Schema outerSchema = join.getLeftChild().getOutSchema();
- Schema innerSchema = join.getRightChild().getOutSchema();
- unit.setPartitionType(PartitionType.LIST);
-
- List<Column> outerCollist = new ArrayList<Column>();
- List<Column> innerCollist = new ArrayList<Column>();
-
- // TODO: set partition for store nodes
- if (join.hasJoinQual()) {
- // getting repartition keys
- List<Column[]> cols = PlannerUtil.getJoinKeyPairs(join.getJoinQual(), outerSchema, innerSchema);
- for (Column [] pair : cols) {
- outerCollist.add(pair[0]);
- innerCollist.add(pair[1]);
- }
- } else {
- // broadcast
+
+ @Override
+ public LogicalNode visitExcept(LogicalPlan plan, ExceptNode node, Stack<LogicalNode> stack,
+ GlobalPlanContext data) throws PlanningException {
+ super.visitExcept(plan, node, stack, data);
+ data.topmost = node;
+ return node;
}
-
- Column[] outerCols = new Column[outerCollist.size()];
- Column[] innerCols = new Column[innerCollist.size()];
- outerCols = outerCollist.toArray(outerCols);
- innerCols = innerCollist.toArray(innerCols);
-
- // outer
- if (join.getLeftChild().getType() == NodeType.STORE) {
- outerStore = (StoreTableNode) join.getLeftChild();
- TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
- StoreType.CSV);
- insertOuterScan(join, outerStore.getTableName(), outerMeta);
- prev = convertMap.get(outerStore);
- if (prev != null) {
- prev.setPartitionType(PartitionType.HASH);
- prev.setParentBlock(unit);
- unit.addChildBlock((ScanNode) join.getLeftChild(), prev);
- }
- outerStore.setPartitions(PartitionType.HASH, outerCols, 32);
- } else if (join.getLeftChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, (UnionNode)join.getLeftChild(), unit,
- outerCols, PartitionType.HASH);
- } else {
+ @Override
+ public LogicalNode visitIntersect(LogicalPlan plan, IntersectNode node, Stack<LogicalNode> stack,
+ GlobalPlanContext data) throws PlanningException {
+ super.visitIntersect(plan, node, stack, data);
+ data.topmost = node;
+ return node;
}
-
- // inner
- if (join.getRightChild().getType() == NodeType.STORE) {
- innerStore = (StoreTableNode) join.getRightChild();
- TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
- StoreType.CSV);
- insertInnerScan(join, innerStore.getTableName(), innerMeta);
- prev = convertMap.get(innerStore);
- if (prev != null) {
- prev.setPartitionType(PartitionType.HASH);
- prev.setParentBlock(unit);
- unit.addChildBlock((ScanNode) join.getRightChild(), prev);
- }
- innerStore.setPartitions(PartitionType.HASH, innerCols, 32);
- } else if (join.getRightChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, (UnionNode)join.getRightChild(), unit,
- innerCols, PartitionType.HASH);
+
+ @Override
+ public LogicalNode visitTableSubQuery(LogicalPlan plan, TableSubQueryNode node, Stack<LogicalNode> stack,
+ GlobalPlanContext data) throws PlanningException {
+ super.visitTableSubQuery(plan, node, stack, data);
+ data.topmost = node;
+ return node;
}
-
- return unit;
- }
-
- /**
- * Recursive하게 union의 자식 plan들을 설정
- *
- * @param rootStore 생성할 SubQuery의 store
- * @param union union을 root로 하는 logical plan
- * @param cur 생성할 SubQuery
- * @param cols partition 정보를 설정하기 위한 column array
- * @param prevOutputType 자식 SubQuery의 partition type
- * @throws IOException
- */
- private void _handleUnionNode(StoreTableNode rootStore, UnionNode union,
- ExecutionBlock cur, Column[] cols, PartitionType prevOutputType)
- throws IOException {
- StoreTableNode store;
- TableMeta meta;
- ExecutionBlock prev;
-
- if (union.getLeftChild().getType() == NodeType.STORE) {
- store = (StoreTableNode) union.getLeftChild();
- meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
- insertOuterScan(union, store.getTableName(), meta);
- prev = convertMap.get(store);
- if (prev != null) {
- prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setPartitionType(prevOutputType);
- prev.setParentBlock(cur);
- cur.addChildBlock((ScanNode) union.getLeftChild(), prev);
- }
- if (cols != null) {
- store.setPartitions(PartitionType.LIST, cols, 32);
- }
- } else if (union.getLeftChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, (UnionNode)union.getLeftChild(), cur, cols,
- prevOutputType);
+
+ @Override
+ public LogicalNode visitScan(LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack, GlobalPlanContext data)
+ throws PlanningException {
+ data.topmost = node;
+ return node;
}
-
- if (union.getRightChild().getType() == NodeType.STORE) {
- store = (StoreTableNode) union.getRightChild();
- meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
- insertInnerScan(union, store.getTableName(), meta);
- prev = convertMap.get(store);
- if (prev != null) {
- prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setPartitionType(prevOutputType);
- prev.setParentBlock(cur);
- cur.addChildBlock((ScanNode) union.getRightChild(), prev);
- }
- if (cols != null) {
- store.setPartitions(PartitionType.LIST, cols, 32);
- }
- } else if (union.getRightChild().getType() == NodeType.UNION) {
- _handleUnionNode(rootStore, (UnionNode)union.getRightChild(), cur, cols,
- prevOutputType);
+
+ @Override
+ public LogicalNode visitStoreTable(LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack,
+ GlobalPlanContext data) throws PlanningException {
+ super.visitStoreTable(plan, node, stack, data);
+ data.topmost = node;
+ return node;
+ }
+
+ @Override
+ public LogicalNode visitInsert(LogicalPlan plan, InsertNode node, Stack<LogicalNode> stack, GlobalPlanContext data)
+ throws PlanningException {
+ super.visitInsert(plan, node, stack, data);
+ data.topmost = node;
+ return node;
}
}
-
- private LogicalNode insertOuterScan(BinaryNode parent, String tableId,
- TableMeta meta) throws IOException {
- TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
- ScanNode scan = new ScanNode(new FromTable(desc));
- scan.setLocal(true);
- scan.setInSchema(meta.getSchema());
- scan.setOutSchema(meta.getSchema());
- parent.setLeftChild(scan);
- return parent;
- }
-
- private LogicalNode insertInnerScan(BinaryNode parent, String tableId,
- TableMeta meta) throws IOException {
- TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
- ScanNode scan = new ScanNode(new FromTable(desc));
- scan.setLocal(true);
- scan.setInSchema(meta.getSchema());
- scan.setOutSchema(meta.getSchema());
- parent.setRightChild(scan);
- return parent;
+
+ private class UnionsFinderContext {
+ List<UnionNode> unionList = new ArrayList<UnionNode>();
}
-
- private MasterPlan convertToGlobalPlan(LogicalNode logicalPlan) throws IOException {
- recursiveBuildSubQuery(logicalPlan);
- ExecutionBlock root;
- root = convertMap.get(((LogicalRootNode)logicalPlan).getChild());
- root.getStoreTableNode().setLocal(false);
+ private class ConsecutiveUnionFinder extends BasicLogicalPlanVisitor<UnionsFinderContext> {
+ public LogicalNode visitUnion(LogicalPlan plan, UnionNode node, Stack<LogicalNode> stack, UnionsFinderContext data)
+ throws PlanningException {
+ if (node.getType() == NodeType.UNION) {
+ data.unionList.add(node);
+ }
+
+ stack.push(node);
+ TableSubQueryNode leftSubQuery = node.getLeftChild();
+ TableSubQueryNode rightSubQuery = node.getRightChild();
+ if (leftSubQuery.getSubQuery().getType() == NodeType.UNION) {
+ visitChild(plan, leftSubQuery, stack, data);
+ }
+ if (rightSubQuery.getSubQuery().getType() == NodeType.UNION) {
+ visitChild(plan, rightSubQuery, stack, data);
+ }
+ stack.pop();
- return new MasterPlan(root);
+ return node;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
index 8e89938..f921a15 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
@@ -168,4 +168,12 @@ public class QueryContext extends Options {
public boolean isInsert() {
return getCommandType() == NodeType.INSERT;
}
+
+ public void setHiveQueryMode() {
+ setBool("hive.query.mode", true);
+ }
+
+ public boolean isHiveQueryMode() {
+ return getBool("hive.query.mode");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
index 33a1e53..5875a6c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
@@ -129,8 +129,6 @@ public class TajoAsyncDispatcher extends AbstractService implements Dispatcher
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
-// LOG.info("====> Dispatching the event " + event.getClass().getName() + "."
-// + event.toString() );
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 574122b..24eea42 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -40,6 +40,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.util.NetUtils;
@@ -454,17 +455,19 @@ public class TaskSchedulerImpl extends AbstractService
}
}
+ SubQuery subQuery = context.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+
if (attemptId != null) {
- QueryUnit task = context.getQuery()
- .getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
+ QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
new ArrayList<Fragment>(task.getAllFragments()),
- task.getOutputName(),
+ "",
false,
task.getLogicalPlan().toJson(),
- context.getQueryContext());
- if (task.getStoreTableNode().isLocal()) {
+ context.getQueryContext(),
+ subQuery.getDataChannel());
+ if (!subQuery.getBlock().isRoot()) {
taskAssign.setInterQuery();
}
@@ -500,23 +503,24 @@ public class TaskSchedulerImpl extends AbstractService
LOG.debug("Assigned based on * match");
QueryUnit task;
- task = context.getSubQuery(
- attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
+ SubQuery subQuery = context.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+ task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
Lists.newArrayList(task.getAllFragments()),
- task.getOutputName(),
+ "",
false,
task.getLogicalPlan().toJson(),
- context.getQueryContext());
- if (task.getStoreTableNode().isLocal()) {
+ context.getQueryContext(),
+ subQuery.getDataChannel());
+ if (!subQuery.getBlock().isRoot()) {
taskAssign.setInterQuery();
}
for (ScanNode scan : task.getScanNodes()) {
Collection<URI> fetches = task.getFetch(scan);
if (fetches != null) {
for (URI fetch : fetches) {
- taskAssign.addFetch(scan.getTableId(), fetch);
+ taskAssign.addFetch(scan.getTableName(), fetch);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 53c08c0..4618da6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -32,12 +32,14 @@ public class QueryStartEvent extends AbstractEvent {
private QueryId queryId;
private QueryContext queryContext;
+ private String sql;
private String logicalPlanJson;
- public QueryStartEvent(QueryId queryId, QueryContext queryContext, String logicalPlanJson) {
+ public QueryStartEvent(QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
super(EventType.QUERY_START);
this.queryId = queryId;
this.queryContext = queryContext;
+ this.sql = sql;
this.logicalPlanJson = logicalPlanJson;
}
@@ -49,6 +51,10 @@ public class QueryStartEvent extends AbstractEvent {
return this.queryContext;
}
+ public String getSql() {
+ return this.sql;
+ }
+
public String getLogicalPlanJson() {
return logicalPlanJson;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 4ba95b0..4199e17 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.DataChannel;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoConstants;
@@ -36,11 +37,11 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableDescImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.master.ExecutionBlock;
import org.apache.tajo.master.ExecutionBlockCursor;
import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.util.*;
@@ -159,7 +160,7 @@ public class Query implements EventHandler<QueryEvent> {
}
float totalProgress = 0;
- float proportion = 1.0f / (float)getExecutionBlockCursor().size();
+ float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
for (int i = 0; i < subProgresses.length; i++) {
totalProgress += subProgresses[i] * proportion;
@@ -240,7 +241,7 @@ public class Query implements EventHandler<QueryEvent> {
}
public Collection<SubQuery> getSubQueries() {
- return Collections.unmodifiableCollection(this.subqueries.values());
+ return this.subqueries.values();
}
public QueryState getState() {
@@ -272,8 +273,8 @@ public class Query implements EventHandler<QueryEvent> {
@Override
public void transition(Query query, QueryEvent queryEvent) {
- SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
- query.sm);
+ SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
+ query.getExecutionBlockCursor().nextBlock(), query.sm);
subQuery.setPriority(query.priority--);
query.addSubQuery(subQuery);
LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
@@ -292,11 +293,12 @@ public class Query implements EventHandler<QueryEvent> {
query.completedSubQueryCount++;
SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-
+ MasterPlan masterPlan = query.getPlan();
// if the subquery is succeeded
if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
- if (cursor.hasNext()) {
- SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
+ ExecutionBlock nextBlock = cursor.nextBlock();
+ if (!query.getPlan().isTerminal(nextBlock) || !query.getPlan().isRoot(nextBlock)) {
+ SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
nextSubQuery.setPriority(query.priority--);
query.addSubQuery(nextSubQuery);
nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
@@ -310,7 +312,7 @@ public class Query implements EventHandler<QueryEvent> {
} else { // Finish a query
if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-
+ DataChannel finalChannel = masterPlan.getChannel(castEvent.getExecutionBlockId(), nextBlock.getId());
Path finalOutputDir = commitOutputData(query);
TableDesc finalTableDesc = buildOrUpdateResultTableDesc(query, castEvent.getExecutionBlockId(), finalOutputDir);
@@ -361,7 +363,8 @@ public class Query implements EventHandler<QueryEvent> {
/**
* It builds a table desc and update the table desc if necessary.
*/
- public TableDesc buildOrUpdateResultTableDesc(Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+ public TableDesc buildOrUpdateResultTableDesc(Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
// Determine the output table name
SubQuery subQuery = query.getSubQuery(finalExecBlockId);
QueryContext queryContext = query.context.getQueryContext();
@@ -450,33 +453,4 @@ public class Query implements EventHandler<QueryEvent> {
writeLock.unlock();
}
}
-
- public static interface QueryHook {
- QueryState getTargetState();
- void onEvent(Query query);
- }
-
- public static class QueryHookManager {
- Map<QueryState, List<QueryHook>> hookList = TUtil.newHashMap();
-
- public void addHook(QueryHook hook) {
- if (hookList.containsKey(hook.getTargetState())) {
- hookList.get(hook.getTargetState()).add(hook);
- } else {
- hookList.put(hook.getTargetState(), TUtil.newList(hook));
- }
- }
-
- public void doHooks(Query query) {
- QueryState finalState = query.checkQueryForCompleted();
- List<QueryHook> list = hookList.get(finalState);
- if (list != null) {
- for (QueryHook hook : list) {
- hook.onEvent(query);
- }
- } else {
- LOG.error("QueryHookManager cannot deal with " + finalState + " event");
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index c54f8da..53dfb6a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -207,6 +207,7 @@ public class QueryInProgress extends CompositeService {
TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
.setQueryId(queryId.getProto())
.setQueryContext(queryContext.getProto())
+ .setSql(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getSql()))
.setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
.build(), NullCallback.get());
querySubmitted.set(true);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 6611102..29dcb1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.service.Service;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.GlobalOptimizer;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.GlobalPlanner;
import org.apache.tajo.master.TajoAsyncDispatcher;
@@ -55,8 +55,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
private GlobalPlanner globalPlanner;
- private GlobalOptimizer globalOptimizer;
-
private AbstractStorageManager storageManager;
private TajoConf systemConf;
@@ -93,8 +91,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
- globalPlanner = new GlobalPlanner(systemConf, storageManager, dispatcher.getEventHandler());
- globalOptimizer = new GlobalOptimizer();
+ globalPlanner = new GlobalPlanner(systemConf, storageManager);
dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
@@ -217,9 +214,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
public GlobalPlanner getGlobalPlanner() {
return globalPlanner;
}
- public GlobalOptimizer getGlobalOptimizer() {
- return globalOptimizer;
- }
public TajoWorker.WorkerContext getWorkerContext() {
return workerContext;
@@ -253,9 +247,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
@Override
public void handle(QueryStartEvent event) {
LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
- //To change body of implemented methods use File | Settings | File Templates.
QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
- event.getQueryId(), event.getQueryContext(), event.getLogicalPlanJson());
+ event.getQueryId(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
queryMasterTask.init(systemConf);
queryMasterTask.start();
@@ -280,17 +273,21 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
synchronized(queryMasterTasks) {
for(QueryMasterTask eachTask: tempTasks) {
- TajoMasterProtocol.TajoHeartbeat queryHeartbeat = TajoMasterProtocol.TajoHeartbeat.newBuilder()
- .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
- .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
- .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
- .setState(eachTask.getState())
- .setQueryId(eachTask.getQueryId().getProto())
- .setQueryProgress(eachTask.getQuery().getProgress())
- .setQueryFinishTime(eachTask.getQuery().getFinishTime())
- .build();
-
- workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
+ try {
+ TajoMasterProtocol.TajoHeartbeat queryHeartbeat = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+ .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+ .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+ .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setState(eachTask.getState())
+ .setQueryId(eachTask.getQueryId().getProto())
+ .setQueryProgress(eachTask.getQuery().getProgress())
+ .setQueryFinishTime(eachTask.getQuery().getFinishTime())
+ .build();
+
+ workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
}
}
synchronized(queryMasterStop) {
@@ -309,7 +306,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
class ClientSessionTimeoutCheckThread extends Thread {
public void run() {
LOG.info("ClientSessionTimeoutCheckThread started");
- while(true) {
+ while(!queryMasterStop.get()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -337,5 +334,4 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index a4fabcf..e760626 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -30,16 +30,20 @@ import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.tajo.*;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.parser.HiveConverter;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.master.GlobalEngine;
+import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -74,6 +78,8 @@ public class QueryMasterTask extends CompositeService {
private MasterPlan masterPlan;
+ private String sql;
+
private String logicalPlanJson;
private TajoAsyncDispatcher dispatcher;
@@ -91,11 +97,12 @@ public class QueryMasterTask extends CompositeService {
private AtomicBoolean stopped = new AtomicBoolean(false);
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, QueryContext queryContext, String logicalPlanJson) {
+ QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
this.queryId = queryId;
this.queryContext = queryContext;
+ this.sql = sql;
this.logicalPlanJson = logicalPlanJson;
this.querySubmitTime = System.currentTimeMillis();
}
@@ -227,17 +234,44 @@ public class QueryMasterTask extends CompositeService {
return;
}
+ CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
+ LogicalPlanner planner = new LogicalPlanner(catalog);
+ LogicalOptimizer optimizer = new LogicalOptimizer();
+ Expr expr;
+ if (queryContext.isHiveQueryMode()) {
+ HiveConverter hiveConverter = new HiveConverter();
+ expr = hiveConverter.parse(sql);
+ } else {
+ SQLAnalyzer analyzer = new SQLAnalyzer();
+ expr = analyzer.parse(sql);
+ }
+ LogicalPlan plan = null;
try {
- LogicalRootNode logicalNodeRoot = (LogicalRootNode) CoreGsonHelper.fromJson(logicalPlanJson, LogicalNode.class);
- LogicalNode[] scanNodes = PlannerUtil.findAllNodes(logicalNodeRoot, NodeType.SCAN);
- if(scanNodes != null) {
- for(LogicalNode eachScanNode: scanNodes) {
- ScanNode scanNode = (ScanNode)eachScanNode;
- tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
+ plan = planner.createPlan(expr);
+ optimizer.optimize(plan);
+ } catch (PlanningException e) {
+ e.printStackTrace();
+ }
+
+ GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
+ hookManager.addHook(new GlobalEngine.InsertHook());
+ hookManager.doHooks(queryContext, plan);
+
+ try {
+
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
+ if(scanNodes != null) {
+ for(LogicalNode eachScanNode: scanNodes) {
+ ScanNode scanNode = (ScanNode)eachScanNode;
+ tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
+ }
}
}
- MasterPlan globalPlan = queryMasterContext.getGlobalPlanner().build(queryId, logicalNodeRoot);
- this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(globalPlan);
+
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+ queryMasterContext.getGlobalPlanner().build(masterPlan);
+ //this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(masterPlan);
query = new Query(queryTaskContext, queryId, querySubmitTime,
"", queryTaskContext.getEventHandler(), masterPlan);
@@ -306,9 +340,9 @@ public class QueryMasterTask extends CompositeService {
LOG.info("The staging dir '" + outputDir + "' is created.");
queryContext.setStagingDir(stagingDir);
- ////////////////////////////////////////////////////
- // Check and Create An Output Directory If Necessary
- ////////////////////////////////////////////////////
+ /////////////////////////////////////////////////
+ // Check and Create Output Directory If Necessary
+ /////////////////////////////////////////////////
if (queryContext.hasOutputPath()) {
outputDir = queryContext.getOutputPath();
if (queryContext.isOutputOverwrite()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 00dcc0b..5a30c04 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -156,10 +156,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
public void setLogicalPlan(LogicalNode plan) {
- Preconditions.checkArgument(plan.getType() == NodeType.STORE);
-
this.plan = plan;
- store = (StoreTableNode) plan;
LogicalNode node = plan;
ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
@@ -250,7 +247,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
public Collection<URI> getFetch(ScanNode scan) {
- return this.fetchMap.get(scan.getTableId());
+ return this.fetchMap.get(scan.getTableName());
}
public String getOutputName() {