You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/27 03:30:57 UTC
[1/2] TAJO-42: Divide SubQuery into FSM and execution block parts.
(hyunsik)
Updated Branches:
refs/heads/master 6cf96bd4e -> fef3dd509
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
deleted file mode 100644
index 4abeefb..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SchedulerUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.collect.Lists;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-public class SchedulerUtils {
-
- public static class MapComparatorBySize implements Comparator<Map>{
-
- @Override
- public int compare(Map m1, Map m2) {
- return m1.size() - m2.size();
- }
- }
-
- public static List<Map> sortListOfMapsBySize(
- final List<Map> maplist) {
- Map[] arr = new Map[maplist.size()];
- arr = maplist.toArray(arr);
- Arrays.sort(arr, new MapComparatorBySize());
-
- List<Map> newlist = Lists.newArrayList(arr);
- return newlist;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index 754f267..ff73334 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -22,14 +22,15 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.*;
+import tajo.QueryIdFactory;
import tajo.QueryUnitId;
import tajo.SubQueryId;
import tajo.catalog.*;
@@ -43,6 +44,7 @@ import tajo.engine.planner.PlannerUtil;
import tajo.engine.planner.logical.*;
import tajo.master.QueryMaster.QueryContext;
import tajo.master.event.*;
+import tajo.storage.Fragment;
import tajo.storage.StorageManager;
import tajo.util.IndexUtil;
@@ -57,38 +59,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static tajo.conf.TajoConf.ConfVars;
+/**
+ * SubQuery is an instance of an ExecutionBlock.
+ */
public class SubQuery implements EventHandler<SubQueryEvent> {
private static final Log LOG = LogFactory.getLog(SubQuery.class);
- public enum PARTITION_TYPE {
- /** for hash partitioning */
- HASH,
- LIST,
- /** for map-side join */
- BROADCAST,
- /** for range partitioning */
- RANGE
- }
-
- private SubQueryId id;
- private LogicalNode plan = null;
- private StoreTableNode store = null;
- private List<ScanNode> scanlist = null;
- private SubQuery next;
- private Map<ScanNode, SubQuery> childSubQueries;
- private PARTITION_TYPE outputType;
- private boolean hasJoinPlan;
- private boolean hasUnionPlan;
+ private ExecutionBlock block;
private Priority priority;
private TableStat stats;
EventHandler eventHandler;
final StorageManager sm;
- private final GlobalPlanner planner;
- private boolean isLeafQuery = false;
TaskSchedulerImpl taskScheduler;
QueryContext queryContext;
- private Clock clock;
private long startTime;
private long finishTime;
@@ -106,7 +90,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
new StateMachineFactory <SubQuery, SubQueryState,
SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
- .addTransition(SubQueryState.NEW, EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
+ .addTransition(SubQueryState.NEW,
+ EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
.addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
@@ -148,20 +133,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private int completedTaskCount = 0;
- public SubQuery(SubQueryId id, StorageManager sm, GlobalPlanner planner) {
- this.id = id;
- childSubQueries = new HashMap<ScanNode, SubQuery>();
- scanlist = new ArrayList<ScanNode>();
- hasJoinPlan = false;
- hasUnionPlan = false;
+ public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) {
+ this.queryContext = context;
+ this.block = block;
this.sm = sm;
- this.planner = planner;
+ this.eventHandler = context.getEventHandler();
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
-
-
stateMachine = stateMachineFactory.make(this);
}
@@ -190,72 +170,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
}
- public void setQueryContext(QueryContext context) {
- this.queryContext = context;
- }
-
- public void setClock(Clock clock) {
- this.clock = clock;
- }
-
- public void setEventHandler(EventHandler eventHandler) {
- this.eventHandler = eventHandler;
- }
-
- public boolean isLeafQuery() {
- return this.isLeafQuery;
- }
-
- public void setLeafQuery() {
- this.isLeafQuery = true;
+ public ExecutionBlock getBlock() {
+ return block;
}
public void addTask(QueryUnit task) {
tasks.put(task.getId(), task);
}
-
- public void setOutputType(PARTITION_TYPE type) {
- this.outputType = type;
- }
-
- public GlobalPlanner getPlanner() {
- return planner;
- }
-
- public void setLogicalPlan(LogicalNode plan) {
- hasJoinPlan = false;
- Preconditions.checkArgument(plan.getType() == ExprType.STORE
- || plan.getType() == ExprType.CREATE_INDEX);
-
- this.plan = plan;
- if (plan instanceof StoreTableNode) {
- store = (StoreTableNode) plan;
- } else {
- store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
- }
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getSubNode());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- if (binary.getType() == ExprType.JOIN) {
- hasJoinPlan = true;
- } else if (binary.getType() == ExprType.UNION) {
- hasUnionPlan = true;
- }
- s.add(s.size(), binary.getOuterNode());
- s.add(s.size(), binary.getInnerNode());
- } else if (node instanceof ScanNode) {
- scanlist.add((ScanNode)node);
- }
- }
- }
public void abortSubQuery(SubQueryState finalState) {
// TODO -
@@ -271,37 +192,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return this.stateMachine;
}
- public boolean hasJoinPlan() {
- return this.hasJoinPlan;
- }
-
- public boolean hasUnionPlan() {
- return this.hasUnionPlan;
- }
-
- public void setParentQuery(SubQuery next) {
- this.next = next;
- }
-
- public void addChildQuery(ScanNode prevscan, SubQuery prev) {
- childSubQueries.put(prevscan, prev);
- }
-
- public void addChildQueries(Map<ScanNode, SubQuery> prevs) {
- this.childSubQueries.putAll(prevs);
- }
-
- public void setQueryUnits(List<QueryUnit> queryUnits) {
- for (QueryUnit task: queryUnits) {
- tasks.put(task.getId(), task);
- }
- }
-
- public void removeChildQuery(ScanNode scan) {
- scanlist.remove(scan);
- this.childSubQueries.remove(scan);
- }
-
public void setPriority(int priority) {
if (this.priority == null) {
this.priority = new Priority(priority);
@@ -316,56 +206,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
this.stats = stat;
}
- public SubQuery getParentQuery() {
- return this.next;
- }
-
- public boolean hasChildQuery() {
- return !this.childSubQueries.isEmpty();
- }
-
- public Iterator<SubQuery> getChildIterator() {
- return this.childSubQueries.values().iterator();
- }
-
- public Collection<SubQuery> getChildQueries() {
- return this.childSubQueries.values();
- }
-
- public Map<ScanNode, SubQuery> getChildMaps() {
- return this.childSubQueries;
- }
-
public SubQuery getChildQuery(ScanNode scanForChild) {
- return this.childSubQueries.get(scanForChild);
- }
-
- public String getOutputName() {
- return this.store.getTableName();
- }
-
- public PARTITION_TYPE getOutputType() {
- return this.outputType;
- }
-
- public Schema getOutputSchema() {
- return this.store.getOutSchema();
- }
-
- public StoreTableNode getStoreTableNode() {
- return this.store;
- }
-
- public ScanNode[] getScanNodes() {
- return this.scanlist.toArray(new ScanNode[scanlist.size()]);
- }
-
- public LogicalNode getLogicalPlan() {
- return this.plan;
+ return queryContext.getSubQuery(block.getChildBlock(scanForChild).getId());
}
public SubQueryId getId() {
- return this.id;
+ return block.getId();
}
public QueryUnit[] getQueryUnits() {
@@ -387,13 +233,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append(this.id);
-/* sb.append(" plan: " + plan.toString());
- sb.append("next: " + next + " childSubQueries:");
- Iterator<SubQuery> it = getChildIterator();
- while (it.hasNext()) {
- sb.append(" " + it.next());
- }*/
+ sb.append(this.getId());
return sb.toString();
}
@@ -401,18 +241,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
public boolean equals(Object o) {
if (o instanceof SubQuery) {
SubQuery other = (SubQuery)o;
- return this.id.equals(other.getId());
+ return getId().equals(other.getId());
}
return false;
}
@Override
public int hashCode() {
- return this.id.hashCode();
+ return getId().hashCode();
}
public int compareTo(SubQuery other) {
- return this.id.compareTo(other.id);
+ return getId().compareTo(other.getId());
}
public SubQueryState getState() {
@@ -439,8 +279,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
int numBlocks = 0, numPartitions = 0;
List<ColumnStat> columnStats = Lists.newArrayList();
- for (SubQuery child : unit.getChildQueries()) {
- childStat = child.getStats();
+ Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
+ while (it.hasNext()) {
+ ExecutionBlock block = it.next();
+ SubQuery childSubQuery = unit.queryContext.getSubQuery(block.getId());
+ childStat = childSubQuery.getStats();
avgRows += childStat.getAvgRows();
columnStats.addAll(childStat.getColumnStats());
numBlocks += childStat.getNumBlocks();
@@ -448,6 +291,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
numPartitions += childStat.getNumPartitions();
numRows += childStat.getNumRows();
}
+
stat.setColumnStats(columnStats);
stat.setNumBlocks(numBlocks);
stat.setNumBytes(numBytes);
@@ -458,7 +302,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
public void cleanUp() {
- if (hasUnionPlan()) {
+ if (block.hasUnion()) {
try {
// write meta and continue
TableStat stat = generateUnionStat(this);
@@ -469,35 +313,38 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
e.printStackTrace();
}
} else {
+ LOG.info("SubQuery: " + getId() + " sets TableStat");
TableStat stat = generateStat();
- setStats(stat);
try {
writeStat(this, stat);
} catch (IOException e) {
}
}
- finishTime = clock.getTime();
+ finishTime = queryContext.getClock().getTime();
}
- private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+ private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
+ SubQueryEvent, SubQueryState> {
@Override
public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- subQuery.startTime = subQuery.clock.getTime();
+ subQuery.startTime = subQuery.queryContext.getClock().getTime();
subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.queryContext);
subQuery.taskScheduler.init(subQuery.queryContext.getConf());
subQuery.taskScheduler.start();
+ ExecutionBlock execBlock = subQuery.getBlock();
+
try {
// if subquery is dummy, which means it requires only a logical step
// instead of actual query. An 'union all' is an example of
// a dummy subquery.
- if (subQuery.hasUnionPlan()) {
+ if (execBlock.hasUnion()) {
subQuery.finishUnionUnit();
subQuery.cleanUp();
- TableMeta meta = new TableMetaImpl(subQuery.getOutputSchema(),
+ TableMeta meta = new TableMetaImpl(execBlock.getOutputSchema(),
StoreType.CSV, new Options(), subQuery.getStats());
subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
meta));
@@ -505,35 +352,39 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
} else {
QueryUnit [] tasks;
// TODO - should be improved
- if (subQuery.isLeafQuery() && subQuery.getScanNodes().length == 1) {
- SubQuery parent = subQuery.getParentQuery();
+ if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) {
+
// if parent is join, this subquery is for partitioning data.
- if (parent != null) {
+ if (execBlock.hasParentBlock()) {
int numTasks = calculatePartitionNum(subQuery);
- subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, numTasks);
+ Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
}
- tasks = subQuery.getPlanner().createLeafTasks(subQuery);
- } else if (subQuery.getScanNodes().length > 1) {
- SubQuery parent = subQuery.getParentQuery();
+ tasks = createLeafTasks(subQuery);
+ } else if (execBlock.getScanNodes().length > 1) {
// if parent is join, this subquery is for partitioning data.
- if (parent != null) {
+ if (execBlock.hasParentBlock()) {
int numTasks = calculatePartitionNum(subQuery);
- subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, numTasks);
+ Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
+ }
+
+ if (subQuery.getId().getId() == 15) {
+ System.out.println("error point!");
}
+
tasks = Repartitioner.createJoinTasks(subQuery);
} else {
- SubQuery parent = subQuery.getParentQuery();
// if parent is join, this subquery is for partitioning data.
- if (parent != null) {
+ if (execBlock.hasParentBlock()) {
int partitionNum = calculatePartitionNum(subQuery);
- subQuery.getPlanner().setPartitionNumberForTwoPhase(subQuery, partitionNum);
+ Repartitioner.setPartitionNumberForTwoPhase(subQuery, partitionNum);
}
int numTasks = getNonLeafTaskNum(subQuery);
- tasks = Repartitioner.createNonLeafTask(subQuery,
- subQuery.getChildIterator().next(), numTasks);
+ SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId();
+ SubQuery child = subQuery.queryContext.getSubQuery(childId);
+ tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
}
for (QueryUnit task : tasks) {
subQuery.addTask(task);
@@ -543,7 +394,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
// if there is no tasks
if (subQuery.tasks.size() == 0) {
subQuery.cleanUp();
- TableMeta meta = toTableMeta(subQuery.getStoreTableNode());
+ TableMeta meta = toTableMeta(execBlock.getStoreTableNode());
meta.setStat(subQuery.getStats());
subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
meta));
@@ -565,10 +416,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
org.apache.hadoop.yarn.api.records.Priority priority =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
org.apache.hadoop.yarn.api.records.Priority.class);
- priority.setPriority(100 - subQuery.getPriority().get());
+ priority.setPriority(subQuery.getPriority().get());
ContainerAllocationEvent event =
new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
- subQuery.getId(), priority, resource, numRequest, subQuery.isLeafQuery(), 0.0f);
+ subQuery.getId(), priority, resource, numRequest, execBlock.isLeafBlock(), 0.0f);
subQuery.eventHandler.handle(event);
}
}
@@ -582,6 +433,48 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return SubQueryState.FAILED;
}
}
+
+ public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ ScanNode[] scans = execBlock.getScanNodes();
+ Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+ TableMeta meta;
+ Path inputPath;
+
+ ScanNode scan = scans[0];
+ TableDesc desc = subQuery.queryContext.getCatalog().getTableDesc(scan.getTableId());
+ inputPath = desc.getPath();
+ meta = desc.getMeta();
+
+ // TODO - should be change the inner directory
+ Path oldPath = new Path(inputPath, "data");
+ FileSystem fs = inputPath.getFileSystem(subQuery.queryContext.getConf());
+ if (fs.exists(oldPath)) {
+ inputPath = oldPath;
+ }
+ List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
+
+ QueryUnit queryUnit;
+ List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
+
+ int i = 0;
+ for (Fragment fragment : fragments) {
+ queryUnit = newQueryUnit(subQuery, i++);
+ queryUnit.setFragment(scan.getTableId(), fragment);
+ queryUnits.add(queryUnit);
+ }
+
+ return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
+ }
+
+ private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ QueryUnit unit = new QueryUnit(
+ QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+ subQuery.eventHandler);
+ unit.setLogicalPlan(execBlock.getPlan());
+ return unit;
+ }
}
/**
@@ -593,25 +486,25 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
*/
public static int calculatePartitionNum(SubQuery subQuery) {
TajoConf conf = subQuery.queryContext.getConf();
- SubQuery parent = subQuery.getParentQuery();
+ ExecutionBlock parent = subQuery.getBlock().getParentBlock();
GroupbyNode grpNode = null;
if (parent != null) {
grpNode = (GroupbyNode) PlannerUtil.findTopNode(
- parent.getLogicalPlan(), ExprType.GROUP_BY);
+ parent.getPlan(), ExprType.GROUP_BY);
}
// Is this subquery the first step of join?
if (parent != null && parent.getScanNodes().length == 2) {
- Iterator<SubQuery> child = parent.getChildQueries().iterator();
+ Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
// for inner
- SubQuery outer = child.next();
- long outerVolume = getInputVolume(outer);
+ ExecutionBlock outer = child.next();
+ long outerVolume = getInputVolume(subQuery.queryContext, outer);
// for inner
- SubQuery inner = child.next();
- long innerVolume = getInputVolume(inner);
+ ExecutionBlock inner = child.next();
+ long innerVolume = getInputVolume(subQuery.queryContext, inner);
LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
@@ -631,7 +524,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
if (grpNode.getGroupingColumns().length == 0) {
return 1;
} else {
- long volume = getInputVolume(subQuery);
+ long volume = getInputVolume(subQuery.queryContext, subQuery.block);
int mb = (int) Math.ceil((double)volume / 1048576);
LOG.info("Table's volume is approximately " + mb + " MB");
@@ -643,7 +536,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
} else {
LOG.info("============>>>>> Unexpected Case! <<<<<================");
- long volume = getInputVolume(subQuery);
+ long volume = getInputVolume(subQuery.queryContext, subQuery.block);
int mb = (int) Math.ceil((double)volume / 1048576);
LOG.info("Table's volume is approximately " + mb + " MB");
@@ -654,20 +547,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
}
- public static long getInputVolume(SubQuery subQuery) {
- CatalogService catalog = subQuery.queryContext.getCatalog();
- if (subQuery.hasChildQuery()) {
- Iterator<SubQuery> it = subQuery.getChildQueries().iterator();
+ public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) {
+ CatalogService catalog = context.getCatalog();
+ if (execBlock.isLeafBlock()) {
+ ScanNode outerScan = execBlock.getScanNodes()[0];
+ TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
+ return stat.getNumBytes();
+ } else {
long aggregatedVolume = 0;
- while(it.hasNext()) {
- aggregatedVolume += it.next().getStats().getNumBytes();
+ for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
+ SubQuery subquery = context.getSubQuery(childBlock.getId());
+ aggregatedVolume += subquery.getStats().getNumBytes();
}
return aggregatedVolume;
- } else {
- ScanNode outerScan = subQuery.getScanNodes()[0];
- TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
- return stat.getNumBytes();
}
}
@@ -679,7 +572,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
*/
public static int getNonLeafTaskNum(SubQuery subQuery) {
// Getting intermediate data size
- long volume = getInputVolume(subQuery);
+ long volume = getInputVolume(subQuery.queryContext, subQuery.getBlock());
int mb = (int) Math.ceil((double)volume / 1048576);
LOG.info("Table's volume is approximately " + mb + " MB");
@@ -766,6 +659,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
// TODO - records succeeded, failed, killed completed task
// TODO - records metrics
+ ExecutionBlock execBlock = subQuery.getBlock();
+
for (Entry<ContainerId, Container> entry : subQuery.containers.entrySet()) {
subQuery.eventHandler.handle(new TaskRunnerStopEvent(subQuery.getId(),
entry.getValue()));
@@ -773,13 +668,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.cleanUp();
subQuery.taskScheduler.stop();
- StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+ StoreTableNode storeTableNode = execBlock.getStoreTableNode();
TableMeta meta = toTableMeta(storeTableNode);
meta.setStat(subQuery.getStats());
subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
meta));
- subQuery.finishTime = subQuery.clock.getTime();
+ subQuery.finishTime = subQuery.queryContext.getClock().getTime();
}
}
@@ -809,15 +704,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private void writeStat(SubQuery subQuery, TableStat stat)
throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
- if (subQuery.getLogicalPlan().getType() == ExprType.CREATE_INDEX) {
- IndexWriteNode index = (IndexWriteNode) subQuery.getLogicalPlan();
+ if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
+ IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index");
TableMeta meta;
if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
meta = sm.getTableMeta(indexPath);
} else {
- StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+ StoreTableNode storeTableNode = execBlock.getStoreTableNode();
meta = toTableMeta(storeTableNode);
}
String indexName = IndexUtil.getIndexName(index.getTableName(),
@@ -828,10 +724,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
sm.writeTableMeta(indexPath, meta);
} else {
- StoreTableNode storeTableNode = subQuery.getStoreTableNode();
+ StoreTableNode storeTableNode = execBlock.getStoreTableNode();
TableMeta meta = toTableMeta(storeTableNode);
meta.setStat(stat);
- sm.writeTableMeta(sm.getTablePath(subQuery.getOutputName()), meta);
+ sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
}
}
@@ -845,19 +741,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
}
- private void finalizePrevSubQuery(SubQuery subQuery)
- throws Exception {
- SubQuery prevSubQuery;
- for (ScanNode scan : subQuery.getScanNodes()) {
- prevSubQuery = subQuery.getChildQuery(scan);
- if (prevSubQuery.getStoreTableNode().getSubNode().getType() != ExprType.UNION) {
- for (QueryUnit unit : prevSubQuery.getQueryUnits()) {
- //sendCommand(unit.getLastAttempt(), CommandType.FINALIZE);
- }
- }
- }
- }
-
@Override
public void handle(SubQueryEvent event) {
//if (LOG.isDebugEnabled()) {
@@ -871,14 +754,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
- eventHandler.handle(new SubQueryEvent(this.id,
+ eventHandler.handle(new SubQueryEvent(getId(),
SubQueryEventType.SQ_INTERNAL_ERROR));
}
//notify the eventhandler of state change
if (LOG.isDebugEnabled()) {
if (oldState != getState()) {
- LOG.debug(id + " SubQuery Transitioned from " + oldState + " to "
+ LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
+ getState());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
deleted file mode 100644
index 8756dc4..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskContainerManager.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.service.AbstractService;
-
-public class TaskContainerManager extends AbstractService {
-
- public TaskContainerManager() {
- super(TaskContainerManager.class.getName());
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
index 3f92608..816e285 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
@@ -389,8 +389,7 @@ public class TaskSchedulerImpl extends AbstractService
LOG.debug("Assigned based on * match");
QueryUnit task;
- task = context.getQuery()
- .getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
+ task = context.getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
Lists.newArrayList(task.getAllFragments()),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
index 93b786d..43fc1d0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/cluster/WorkerListener.java
@@ -126,7 +126,7 @@ public class WorkerListener extends AbstractService
QueryUnitAttemptIdProto attemptIdProto,
RpcCallback<BoolProto> done) {
QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
- context.getQuery(attemptId.getQueryId()).getContext().getQuery().getSubQuery(attemptId.getSubQueryId()).
+ context.getQuery(attemptId.getQueryId()).getContext().getSubQuery(attemptId.getSubQueryId()).
getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
resetExpireTime();
done.run(TRUE_PROTO);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
index 28ccbfe..d0fae9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
@@ -47,7 +47,7 @@ import tajo.engine.planner.logical.StoreTableNode;
import tajo.engine.planner.physical.PhysicalExec;
import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
import tajo.ipc.protocolrecords.QueryUnitRequest;
-import tajo.master.SubQuery.PARTITION_TYPE;
+import tajo.master.ExecutionBlock.PartitionType;
import tajo.rpc.NullCallback;
import tajo.storage.Fragment;
import tajo.storage.StorageUtil;
@@ -101,7 +101,7 @@ public class Task {
private AtomicBoolean progressFlag = new AtomicBoolean(false);
// TODO - to be refactored
- private PARTITION_TYPE partitionType = null;
+ private PartitionType partitionType = null;
private Schema finalSchema = null;
private TupleComparator sortComp = null;
@@ -153,7 +153,7 @@ public class Task {
context.setInterQuery();
StoreTableNode store = (StoreTableNode) plan;
this.partitionType = store.getPartitionType();
- if (partitionType == PARTITION_TYPE.RANGE) {
+ if (partitionType == PartitionType.RANGE) {
SortNode sortNode = (SortNode) store.getSubNode();
this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index 2af74bb..be8a7f1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -46,9 +46,9 @@ import tajo.engine.planner.LogicalPlanner;
import tajo.engine.planner.PlanningContext;
import tajo.engine.planner.global.MasterPlan;
import tajo.engine.planner.logical.*;
+import tajo.master.ExecutionBlock;
+import tajo.master.ExecutionBlock.PartitionType;
import tajo.master.GlobalPlanner;
-import tajo.master.SubQuery;
-import tajo.master.SubQuery.PARTITION_TYPE;
import tajo.master.TajoMaster;
import tajo.storage.*;
@@ -159,11 +159,11 @@ public class TestGlobalQueryPlanner {
MasterPlan globalPlan = planner.build(queryId,
(LogicalRootNode) plan1);
- SubQuery unit = globalPlan.getRoot();
- assertFalse(unit.hasChildQuery());
- assertEquals(PARTITION_TYPE.LIST, unit.getOutputType());
+ ExecutionBlock unit = globalPlan.getRoot();
+ assertFalse(unit.hasChildBlock());
+ assertEquals(PartitionType.LIST, unit.getPartitionType());
- LogicalNode plan2 = unit.getLogicalPlan();
+ LogicalNode plan2 = unit.getPlan();
assertEquals(ExprType.STORE, plan2.getType());
assertEquals(ExprType.SCAN, ((StoreTableNode)plan2).getSubNode().getType());
}
@@ -178,20 +178,20 @@ public class TestGlobalQueryPlanner {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
- SubQuery next, prev;
+ ExecutionBlock next, prev;
next = globalPlan.getRoot();
- assertTrue(next.hasChildQuery());
- assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
+ assertTrue(next.hasChildBlock());
+ assertEquals(PartitionType.LIST, next.getPartitionType());
for (ScanNode scan : next.getScanNodes()) {
assertTrue(scan.isLocal());
}
assertFalse(next.getStoreTableNode().isLocal());
- Iterator<SubQuery> it= next.getChildIterator();
+ Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
prev = it.next();
- assertFalse(prev.hasChildQuery());
- assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+ assertFalse(prev.hasChildBlock());
+ assertEquals(PartitionType.HASH, prev.getPartitionType());
assertTrue(prev.getStoreTableNode().isLocal());
assertFalse(it.hasNext());
@@ -216,26 +216,26 @@ public class TestGlobalQueryPlanner {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
- SubQuery next, prev;
+ ExecutionBlock next, prev;
next = globalPlan.getRoot();
assertEquals(ExprType.PROJECTION,
next.getStoreTableNode().getSubNode().getType());
- assertTrue(next.hasChildQuery());
- assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
- Iterator<SubQuery> it= next.getChildIterator();
+ assertTrue(next.hasChildBlock());
+ assertEquals(PartitionType.LIST, next.getPartitionType());
+ Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
prev = it.next();
assertEquals(ExprType.SORT,
prev.getStoreTableNode().getSubNode().getType());
- assertTrue(prev.hasChildQuery());
- assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
- it= prev.getChildIterator();
+ assertTrue(prev.hasChildBlock());
+ assertEquals(PartitionType.LIST, prev.getPartitionType());
+ it= prev.getChildBlocks().iterator();
next = prev;
prev = it.next();
- assertFalse(prev.hasChildQuery());
- assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType());
+ assertFalse(prev.hasChildBlock());
+ assertEquals(PartitionType.RANGE, prev.getPartitionType());
assertFalse(it.hasNext());
ScanNode []scans = prev.getScanNodes();
@@ -259,59 +259,59 @@ public class TestGlobalQueryPlanner {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
- SubQuery next, prev;
+ ExecutionBlock next, prev;
// the second phase of the sort
next = globalPlan.getRoot();
- assertTrue(next.hasChildQuery());
- assertEquals(PARTITION_TYPE.LIST, next.getOutputType());
+ assertTrue(next.hasChildBlock());
+ assertEquals(PartitionType.LIST, next.getPartitionType());
assertEquals(ExprType.PROJECTION, next.getStoreTableNode().getSubNode().getType());
ScanNode []scans = next.getScanNodes();
assertEquals(1, scans.length);
- Iterator<SubQuery> it= next.getChildIterator();
+ Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
prev = it.next();
assertEquals(ExprType.SORT, prev.getStoreTableNode().getSubNode().getType());
- assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
+ assertEquals(PartitionType.LIST, prev.getPartitionType());
scans = prev.getScanNodes();
assertEquals(1, scans.length);
- it= prev.getChildIterator();
+ it= prev.getChildBlocks().iterator();
// the first phase of the sort
prev = it.next();
assertEquals(ExprType.SORT, prev.getStoreTableNode().getSubNode().getType());
assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
- assertTrue(prev.hasChildQuery());
- assertEquals(PARTITION_TYPE.RANGE, prev.getOutputType());
+ assertTrue(prev.hasChildBlock());
+ assertEquals(PartitionType.RANGE, prev.getPartitionType());
assertFalse(it.hasNext());
scans = prev.getScanNodes();
assertEquals(1, scans.length);
next = prev;
- it= next.getChildIterator();
+ it= next.getChildBlocks().iterator();
// the second phase of the join
prev = it.next();
assertEquals(ExprType.JOIN, prev.getStoreTableNode().getSubNode().getType());
assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
- assertTrue(prev.hasChildQuery());
- assertEquals(PARTITION_TYPE.LIST, prev.getOutputType());
+ assertTrue(prev.hasChildBlock());
+ assertEquals(PartitionType.LIST, prev.getPartitionType());
assertFalse(it.hasNext());
scans = prev.getScanNodes();
assertEquals(2, scans.length);
next = prev;
- it= next.getChildIterator();
+ it= next.getChildBlocks().iterator();
// the first phase of the join
prev = it.next();
assertEquals(ExprType.SCAN, prev.getStoreTableNode().getSubNode().getType());
- assertFalse(prev.hasChildQuery());
- assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+ assertFalse(prev.hasChildBlock());
+ assertEquals(PartitionType.HASH, prev.getPartitionType());
assertEquals(1, prev.getScanNodes().length);
prev = it.next();
assertEquals(ExprType.SCAN, prev.getStoreTableNode().getSubNode().getType());
- assertFalse(prev.hasChildQuery());
- assertEquals(PARTITION_TYPE.HASH, prev.getOutputType());
+ assertFalse(prev.hasChildBlock());
+ assertEquals(PartitionType.HASH, prev.getPartitionType());
assertEquals(1, prev.getScanNodes().length);
assertFalse(it.hasNext());
}
@@ -325,15 +325,15 @@ public class TestGlobalQueryPlanner {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
- SubQuery unit = globalPlan.getRoot();
+ ExecutionBlock unit = globalPlan.getRoot();
StoreTableNode store = unit.getStoreTableNode();
assertEquals(ExprType.JOIN, store.getSubNode().getType());
- assertTrue(unit.hasChildQuery());
+ assertTrue(unit.hasChildBlock());
ScanNode [] scans = unit.getScanNodes();
assertEquals(2, scans.length);
- SubQuery prev;
+ ExecutionBlock prev;
for (ScanNode scan : scans) {
- prev = unit.getChildQuery(scan);
+ prev = unit.getChildBlock(scan);
store = prev.getStoreTableNode();
assertEquals(ExprType.SCAN, store.getSubNode().getType());
}
@@ -348,14 +348,14 @@ public class TestGlobalQueryPlanner {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
- SubQuery unit = globalPlan.getRoot();
+ ExecutionBlock unit = globalPlan.getRoot();
StoreTableNode store = unit.getStoreTableNode();
assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
ScanNode[] scans = unit.getScanNodes();
assertEquals(1, scans.length);
- unit = unit.getChildQuery(scans[0]);
+ unit = unit.getChildBlock(scans[0]);
store = unit.getStoreTableNode();
assertEquals(ExprType.UNION, store.getSubNode().getType());
UnionNode union = (UnionNode) store.getSubNode();
@@ -367,11 +367,11 @@ public class TestGlobalQueryPlanner {
union = (UnionNode) union.getInnerNode();
assertEquals(ExprType.SCAN, union.getOuterNode().getType());
assertEquals(ExprType.SCAN, union.getInnerNode().getType());
- assertTrue(unit.hasChildQuery());
+ assertTrue(unit.hasChildBlock());
String tableId = "";
for (ScanNode scan : unit.getScanNodes()) {
- SubQuery prev = unit.getChildQuery(scan);
+ ExecutionBlock prev = unit.getChildBlock(scan);
store = prev.getStoreTableNode();
assertEquals(ExprType.GROUP_BY, store.getSubNode().getType());
GroupbyNode groupby = (GroupbyNode) store.getSubNode();
@@ -382,7 +382,7 @@ public class TestGlobalQueryPlanner {
assertEquals(tableId, store.getTableName());
}
assertEquals(1, prev.getScanNodes().length);
- prev = prev.getChildQuery(prev.getScanNodes()[0]);
+ prev = prev.getChildBlock(prev.getScanNodes()[0]);
store = prev.getStoreTableNode();
assertEquals(ExprType.GROUP_BY, store.getSubNode().getType());
groupby = (GroupbyNode) store.getSubNode();
@@ -486,13 +486,13 @@ public class TestGlobalQueryPlanner {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) plan);
- SubQuery second, first, mid;
+ ExecutionBlock second, first, mid;
ScanNode secondScan, firstScan, midScan;
second = globalPlan.getRoot();
assertTrue(second.getScanNodes().length == 1);
- first = second.getChildQuery(second.getScanNodes()[0]);
+ first = second.getChildBlock(second.getScanNodes()[0]);
GroupbyNode firstGroupby, secondGroupby, midGroupby;
secondGroupby = (GroupbyNode) second.getStoreTableNode().getSubNode();
@@ -510,10 +510,10 @@ public class TestGlobalQueryPlanner {
midScan = mid.getScanNodes()[0];
firstScan = first.getScanNodes()[0];
- assertTrue(first.getParentQuery().equals(mid));
- assertTrue(mid.getParentQuery().equals(second));
- assertTrue(second.getChildQuery(secondScan).equals(mid));
- assertTrue(mid.getChildQuery(midScan).equals(first));
+ assertTrue(first.getParentBlock().equals(mid));
+ assertTrue(mid.getParentBlock().equals(second));
+ assertTrue(second.getChildBlock(secondScan).equals(mid));
+ assertTrue(mid.getChildBlock(midScan).equals(first));
assertEquals(first.getOutputName(), midScan.getTableId());
assertEquals(first.getOutputSchema(), midScan.getInSchema());
assertEquals(mid.getOutputName(), secondScan.getTableId());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index fae28f7..c0d0dd2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -43,8 +43,8 @@ import tajo.engine.planner.LogicalOptimizer;
import tajo.engine.planner.LogicalPlanner;
import tajo.engine.planner.PlanningContext;
import tajo.engine.planner.logical.*;
+import tajo.master.ExecutionBlock;
import tajo.master.GlobalPlanner;
-import tajo.master.SubQuery;
import tajo.storage.*;
import java.io.IOException;
@@ -144,7 +144,7 @@ public class TestGlobalQueryOptimizer {
(LogicalRootNode) plan);
globalPlan = optimizer.optimize(globalPlan);
- SubQuery unit = globalPlan.getRoot();
+ ExecutionBlock unit = globalPlan.getRoot();
StoreTableNode store = unit.getStoreTableNode();
assertEquals(ExprType.PROJECTION, store.getSubNode().getType());
ProjectionNode proj = (ProjectionNode) store.getSubNode();
@@ -153,16 +153,16 @@ public class TestGlobalQueryOptimizer {
assertEquals(ExprType.SCAN, sort.getSubNode().getType());
ScanNode scan = (ScanNode) sort.getSubNode();
- assertTrue(unit.hasChildQuery());
- unit = unit.getChildQuery(scan);
+ assertTrue(unit.hasChildBlock());
+ unit = unit.getChildBlock(scan);
store = unit.getStoreTableNode();
assertEquals(ExprType.SORT, store.getSubNode().getType());
sort = (SortNode) store.getSubNode();
assertEquals(ExprType.JOIN, sort.getSubNode().getType());
- assertTrue(unit.hasChildQuery());
+ assertTrue(unit.hasChildBlock());
for (ScanNode prevscan : unit.getScanNodes()) {
- SubQuery prev = unit.getChildQuery(prevscan);
+ ExecutionBlock prev = unit.getChildBlock(prevscan);
store = prev.getStoreTableNode();
assertEquals(ExprType.SCAN, store.getSubNode().getType());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 6b8496a..8b5dee5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -46,7 +46,7 @@ import tajo.engine.planner.logical.LogicalNode;
import tajo.engine.planner.logical.LogicalRootNode;
import tajo.engine.planner.logical.StoreTableNode;
import tajo.engine.planner.logical.UnionNode;
-import tajo.master.SubQuery;
+import tajo.master.ExecutionBlock.PartitionType;
import tajo.master.TajoMaster;
import tajo.storage.*;
import tajo.storage.index.bst.BSTIndex;
@@ -424,7 +424,7 @@ public class TestPhysicalPlanner {
Column key1 = new Column("score.deptName", DataType.STRING);
Column key2 = new Column("score.class", DataType.STRING);
StoreTableNode storeNode = new StoreTableNode("partition");
- storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[]{key1, key2}, numPartitions);
+ storeNode.setPartitions(PartitionType.HASH, new Column[]{key1, key2}, numPartitions);
PlannerUtil.insertNode(plan, storeNode);
plan = LogicalOptimizer.optimize(context, plan);
@@ -482,7 +482,7 @@ public class TestPhysicalPlanner {
int numPartitions = 1;
StoreTableNode storeNode = new StoreTableNode("emptyset");
- storeNode.setPartitions(SubQuery.PARTITION_TYPE.HASH, new Column[] {}, numPartitions);
+ storeNode.setPartitions(PartitionType.HASH, new Column[] {}, numPartitions);
PlannerUtil.insertNode(plan, storeNode);
plan = LogicalOptimizer.optimize(context, plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
new file mode 100644
index 0000000..c6a5d43
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestExecutionBlockCursor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import tajo.QueryIdFactory;
+import tajo.TajoTestingCluster;
+import tajo.benchmark.TPCH;
+import tajo.catalog.CatalogService;
+import tajo.catalog.TCatUtil;
+import tajo.catalog.TableDesc;
+import tajo.catalog.TableMeta;
+import tajo.catalog.proto.CatalogProtos;
+import tajo.conf.TajoConf;
+import tajo.engine.parser.QueryAnalyzer;
+import tajo.engine.planner.LogicalPlanner;
+import tajo.engine.planner.PlanningContext;
+import tajo.engine.planner.global.MasterPlan;
+import tajo.engine.planner.logical.LogicalNode;
+import tajo.engine.planner.logical.LogicalRootNode;
+import tajo.storage.StorageManager;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecutionBlockCursor {
+ private static TajoTestingCluster util;
+ private static TajoConf conf;
+ private static CatalogService catalog;
+ private static GlobalPlanner planner;
+ private static QueryAnalyzer analyzer;
+ private static LogicalPlanner logicalPlanner;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.startCatalogCluster();
+
+ conf = util.getConfiguration();
+ catalog = util.getMiniCatalogCluster().getCatalog();
+ TPCH tpch = new TPCH();
+ tpch.loadSchemas();
+ tpch.loadOutSchema();
+ for (String table : tpch.getTableNames()) {
+ TableMeta m = TCatUtil.newTableMeta(tpch.getSchema(table), CatalogProtos.StoreType.CSV);
+ TableDesc d = TCatUtil.newTableDesc(table, m, new Path("file:///"));
+ catalog.addTable(d);
+ }
+
+ analyzer = new QueryAnalyzer(catalog);
+ logicalPlanner = new LogicalPlanner(catalog);
+
+ StorageManager sm = new StorageManager(conf);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ planner = new GlobalPlanner(conf, catalog, sm, dispatcher.getEventHandler());
+ }
+
+ public static void tearDown() {
+ util.shutdownCatalogCluster();
+ }
+
+ @Test
+ public void testNextBlock() throws Exception {
+ PlanningContext context = analyzer.parse(
+ "select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, ps_supplycost, " +
+ "r_name, p_type, p_size " +
+ "from region join nation on n_regionkey = r_regionkey and r_name = 'AMERICA' " +
+ "join supplier on s_nationkey = n_nationkey " +
+ "join partsupp on s_suppkey = ps_suppkey " +
+ "join part on p_partkey = ps_partkey and p_type like '%BRASS' and p_size = 15");
+ LogicalNode logicalPlan = logicalPlanner.createPlan(context);
+ MasterPlan plan = planner.build(QueryIdFactory.newQueryId(), (LogicalRootNode) logicalPlan);
+
+ ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
+
+ int count = 0;
+ while(cursor.hasNext()) {
+ cursor.nextBlock();
+ count++;
+ }
+
+ // 4 input relations, 4 join, and 1 projection = 10 execution blocks
+ assertEquals(10, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
index ca3e870..2de54b3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/master/TestRepartitioner.java
@@ -23,6 +23,7 @@ import org.junit.Test;
import tajo.QueryId;
import tajo.SubQueryId;
import tajo.TestQueryUnitId;
+import tajo.master.ExecutionBlock.PartitionType;
import tajo.util.TUtil;
import tajo.util.TajoIdUtils;
@@ -47,7 +48,7 @@ public class TestRepartitioner {
Collection<URI> uris = Repartitioner.
createHashFetchURL(hostName + ":" + port, sid, partitionId,
- SubQuery.PARTITION_TYPE.HASH, intermediateEntries);
+ PartitionType.HASH, intermediateEntries);
List<String> taList = TUtil.newList();
for (URI uri : uris) {
[2/2] git commit: TAJO-42: Divide SubQuery into FSM and execution
block parts. (hyunsik)
Posted by hy...@apache.org.
TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fef3dd50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fef3dd50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fef3dd50
Branch: refs/heads/master
Commit: fef3dd509cad795d5de034253ea19e23219b9c1b
Parents: 6cf96bd
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Apr 27 10:27:29 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Apr 27 10:27:29 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../engine/planner/global/GlobalOptimizer.java | 63 +-
.../tajo/engine/planner/global/MasterPlan.java | 16 +-
.../engine/planner/logical/StoreTableNode.java | 10 +-
.../src/main/java/tajo/master/ExecutionBlock.java | 181 +++++
.../java/tajo/master/ExecutionBlockCursor.java | 80 ++
.../src/main/java/tajo/master/GlobalPlanner.java | 631 +++------------
.../src/main/java/tajo/master/Query.java | 299 ++------
.../src/main/java/tajo/master/QueryMaster.java | 12 +-
.../src/main/java/tajo/master/Repartitioner.java | 135 ++--
.../src/main/java/tajo/master/SchedulerUtils.java | 47 --
.../src/main/java/tajo/master/SubQuery.java | 381 +++------
.../java/tajo/master/TaskContainerManager.java | 30 -
.../main/java/tajo/master/TaskSchedulerImpl.java | 3 +-
.../java/tajo/master/cluster/WorkerListener.java | 2 +-
.../src/main/java/tajo/worker/Task.java | 6 +-
.../engine/plan/global/TestGlobalQueryPlanner.java | 104 ++--
.../planner/global/TestGlobalQueryOptimizer.java | 12 +-
.../planner/physical/TestPhysicalPlanner.java | 6 +-
.../java/tajo/master/TestExecutionBlockCursor.java | 101 +++
.../test/java/tajo/master/TestRepartitioner.java | 3 +-
21 files changed, 836 insertions(+), 1288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 787cf14..33b422c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)
+
TAJO-32: Cleanup TaskRunner. (hyunsik)
TAJO-27: Modify the document links to point the wiki's ones. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
index 6ebcaef..4c50669 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/GlobalOptimizer.java
@@ -24,11 +24,8 @@ import tajo.engine.planner.logical.ExprType;
import tajo.engine.planner.logical.LogicalNode;
import tajo.engine.planner.logical.ScanNode;
import tajo.engine.planner.logical.UnaryNode;
-import tajo.master.SubQuery;
-import tajo.master.SubQuery.PARTITION_TYPE;
-
-import java.util.Collection;
-import java.util.Iterator;
+import tajo.master.ExecutionBlock;
+import tajo.master.ExecutionBlock.PartitionType;
public class GlobalOptimizer {
@@ -37,62 +34,46 @@ public class GlobalOptimizer {
}
public MasterPlan optimize(MasterPlan plan) {
- SubQuery reducedStep = reduceSchedules(plan.getRoot());
- SubQuery joinChosen = chooseJoinAlgorithm(reducedStep);
+ ExecutionBlock reducedStep = reduceSchedules(plan.getRoot());
- MasterPlan optimized = new MasterPlan(joinChosen);
+ MasterPlan optimized = new MasterPlan(reducedStep);
optimized.setOutputTableName(plan.getOutputTable());
return optimized;
}
@VisibleForTesting
- private SubQuery chooseJoinAlgorithm(SubQuery logicalUnit) {
-
- return logicalUnit;
- }
-
- @VisibleForTesting
- private SubQuery reduceSchedules(SubQuery logicalUnit) {
+ private ExecutionBlock reduceSchedules(ExecutionBlock logicalUnit) {
reduceLogicalQueryUnitStep_(logicalUnit);
return logicalUnit;
}
-
- private void reduceLogicalQueryUnitStep_(SubQuery cur) {
- if (cur.hasChildQuery()) {
- Iterator<SubQuery> it = cur.getChildIterator();
- SubQuery prev;
- while (it.hasNext()) {
- prev = it.next();
- reduceLogicalQueryUnitStep_(prev);
- }
-
- Collection<SubQuery> prevs = cur.getChildQueries();
- it = prevs.iterator();
- while (it.hasNext()) {
- prev = it.next();
- if (prev.getStoreTableNode().getSubNode().getType() != ExprType.UNION &&
- prev.getOutputType() == PARTITION_TYPE.LIST) {
- mergeLogicalUnits(cur, prev);
- }
+
+ private void reduceLogicalQueryUnitStep_(ExecutionBlock cur) {
+ if (cur.hasChildBlock()) {
+ for (ExecutionBlock childBlock: cur.getChildBlocks())
+ reduceLogicalQueryUnitStep_(childBlock);
+ }
+
+ for (ExecutionBlock childBlock: cur.getChildBlocks()) {
+ if (childBlock.getStoreTableNode().getSubNode().getType() != ExprType.UNION &&
+ childBlock.getPartitionType() == PartitionType.LIST) {
+ mergeLogicalUnits(cur, childBlock);
}
}
}
- private SubQuery mergeLogicalUnits(SubQuery parent,
- SubQuery child) {
- LogicalNode p = PlannerUtil.findTopParentNode(parent.getLogicalPlan(),
- ExprType.SCAN);
-// Preconditions.checkArgument(p instanceof UnaryNode);
+ private ExecutionBlock mergeLogicalUnits(ExecutionBlock parent, ExecutionBlock child) {
+ LogicalNode p = PlannerUtil.findTopParentNode(parent.getPlan(), ExprType.SCAN);
+
if (p instanceof UnaryNode) {
UnaryNode u = (UnaryNode) p;
ScanNode scan = (ScanNode) u.getSubNode();
LogicalNode c = child.getStoreTableNode().getSubNode();
- parent.removeChildQuery(scan);
+ parent.removeChildBlock(scan);
u.setSubNode(c);
- parent.setLogicalPlan(parent.getLogicalPlan());
- parent.addChildQueries(child.getChildMaps());
+ parent.setPlan(parent.getPlan());
+ parent.addChildBlocks(child.getChildBlockMap());
}
return parent;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
index 3f78e5b..5ae4faf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/global/MasterPlan.java
@@ -21,25 +21,21 @@
*/
package tajo.engine.planner.global;
-import tajo.master.SubQuery;
+import tajo.master.ExecutionBlock;
public class MasterPlan {
- private SubQuery root;
+ private ExecutionBlock root;
private String outputTableName;
-
- public MasterPlan() {
- root = null;
- }
-
- public MasterPlan(SubQuery root) {
+
+ public MasterPlan(ExecutionBlock root) {
setRoot(root);
}
- public void setRoot(SubQuery root) {
+ public void setRoot(ExecutionBlock root) {
this.root = root;
}
- public SubQuery getRoot() {
+ public ExecutionBlock getRoot() {
return this.root;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
index e185f93..8c4d592 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
import tajo.catalog.Column;
import tajo.catalog.Options;
-import tajo.master.SubQuery;
+import tajo.master.ExecutionBlock.PartitionType;
import tajo.util.TUtil;
import static tajo.catalog.proto.CatalogProtos.StoreType;
@@ -30,7 +30,7 @@ import static tajo.catalog.proto.CatalogProtos.StoreType;
public class StoreTableNode extends UnaryNode implements Cloneable {
@Expose private String tableName;
@Expose private StoreType storageType = StoreType.CSV;
- @Expose private SubQuery.PARTITION_TYPE partitionType;
+ @Expose private PartitionType partitionType;
@Expose private int numPartitions;
@Expose private Column [] partitionKeys;
@Expose private boolean local;
@@ -79,12 +79,12 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
}
public final void setListPartition() {
- this.partitionType = SubQuery.PARTITION_TYPE.LIST;
+ this.partitionType = PartitionType.LIST;
this.partitionKeys = null;
this.numPartitions = 0;
}
- public final void setPartitions(SubQuery.PARTITION_TYPE type, Column [] keys, int numPartitions) {
+ public final void setPartitions(PartitionType type, Column [] keys, int numPartitions) {
Preconditions.checkArgument(keys.length >= 0,
"At least one partition key must be specified.");
Preconditions.checkArgument(numPartitions > 0,
@@ -95,7 +95,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
this.numPartitions = numPartitions;
}
- public SubQuery.PARTITION_TYPE getPartitionType() {
+ public PartitionType getPartitionType() {
return this.partitionType;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
new file mode 100644
index 0000000..04d3bc7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import com.google.common.base.Preconditions;
+import tajo.SubQueryId;
+import tajo.catalog.Schema;
+import tajo.engine.planner.logical.*;
+
+import java.util.*;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
+ * An ExecutionBlock class contains input information (e.g., child execution blocks or input
+ * tables), and output information (e.g., partition type, partition key, and partition number).
+ * In addition, it includes a logical plan to be executed in each node.
+ */
+public class ExecutionBlock {
+
+ public static enum PartitionType {
+ /** for hash partitioning */
+ HASH,
+ LIST,
+ /** for map-side join */
+ BROADCAST,
+ /** for range partitioning */
+ RANGE
+ }
+
+ private SubQueryId subQueryId;
+ private LogicalNode plan = null;
+ private StoreTableNode store = null;
+ private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+ private ExecutionBlock parent;
+ private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
+ private PartitionType outputType;
+ private boolean hasJoinPlan;
+ private boolean hasUnionPlan;
+
+ public ExecutionBlock(SubQueryId subQueryId) {
+ this.subQueryId = subQueryId;
+ }
+
+ public SubQueryId getId() {
+ return subQueryId;
+ }
+
+ public String getOutputName() {
+ return store.getTableName();
+ }
+
+ public void setPartitionType(PartitionType partitionType) {
+ this.outputType = partitionType;
+ }
+
+ public PartitionType getPartitionType() {
+ return outputType;
+ }
+
+ public void setPlan(LogicalNode plan) {
+ hasJoinPlan = false;
+ Preconditions.checkArgument(plan.getType() == ExprType.STORE
+ || plan.getType() == ExprType.CREATE_INDEX);
+
+ this.plan = plan;
+ if (plan instanceof StoreTableNode) {
+ store = (StoreTableNode) plan;
+ } else {
+ store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
+ }
+
+ LogicalNode node = plan;
+ ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+ s.add(node);
+ while (!s.isEmpty()) {
+ node = s.remove(s.size()-1);
+ if (node instanceof UnaryNode) {
+ UnaryNode unary = (UnaryNode) node;
+ s.add(s.size(), unary.getSubNode());
+ } else if (node instanceof BinaryNode) {
+ BinaryNode binary = (BinaryNode) node;
+ if (binary.getType() == ExprType.JOIN) {
+ hasJoinPlan = true;
+ } else if (binary.getType() == ExprType.UNION) {
+ hasUnionPlan = true;
+ }
+ s.add(s.size(), binary.getOuterNode());
+ s.add(s.size(), binary.getInnerNode());
+ } else if (node instanceof ScanNode) {
+ scanlist.add((ScanNode)node);
+ }
+ }
+ }
+
+
+ public LogicalNode getPlan() {
+ return plan;
+ }
+
+ public boolean hasParentBlock() {
+ return parent != null;
+ }
+
+ public ExecutionBlock getParentBlock() {
+ return parent;
+ }
+
+ public void setParentBlock(ExecutionBlock parent) {
+ this.parent = parent;
+ }
+
+ public boolean hasChildBlock() {
+ return childSubQueries.size() > 0;
+ }
+
+ public ExecutionBlock getChildBlock(ScanNode scanNode) {
+ return childSubQueries.get(scanNode);
+ }
+
+ public Collection<ExecutionBlock> getChildBlocks() {
+ return Collections.unmodifiableCollection(childSubQueries.values());
+ }
+
+ public Map<ScanNode, ExecutionBlock> getChildBlockMap() {
+ return childSubQueries;
+ }
+
+ public void addChildBlock(ScanNode scanNode, ExecutionBlock child) {
+ childSubQueries.put(scanNode, child);
+ }
+
+ public int getChildNum() {
+ return childSubQueries.size();
+ }
+
+ public void removeChildBlock(ScanNode scanNode) {
+ scanlist.remove(scanNode);
+ this.childSubQueries.remove(scanNode);
+ }
+
+ public void addChildBlocks(Map<ScanNode, ExecutionBlock> childBlocks) {
+ childSubQueries.putAll(childBlocks);
+ }
+
+ public boolean isLeafBlock() {
+ return childSubQueries.size() == 0;
+ }
+
+ public StoreTableNode getStoreTableNode() {
+ return store;
+ }
+
+ public ScanNode [] getScanNodes() {
+ return this.scanlist.toArray(new ScanNode[scanlist.size()]);
+ }
+
+ public Schema getOutputSchema() {
+ return store.getOutSchema();
+ }
+
+ public boolean hasJoin() {
+ return hasJoinPlan;
+ }
+
+ public boolean hasUnion() {
+ return hasUnionPlan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
new file mode 100644
index 0000000..89e2a5b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import tajo.engine.planner.global.MasterPlan;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
+ * This class is a pointer to an ExecutionBlock that the query engine should execute.
+ * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
+ */
+public class ExecutionBlockCursor {
+ private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
+ private int cursor = 0;
+
+ public ExecutionBlockCursor(MasterPlan plan) {
+ buildOrder(plan.getRoot());
+ }
+
+ private void buildOrder(ExecutionBlock current) {
+ if (current.hasChildBlock()) {
+ if (current.getChildNum() == 1) {
+ ExecutionBlock block = current.getChildBlocks().iterator().next();
+ buildOrder(block);
+ } else {
+ Iterator<ExecutionBlock> it = current.getChildBlocks().iterator();
+ ExecutionBlock outer = it.next();
+ ExecutionBlock inner = it.next();
+
+ // Switch between outer and inner
+ // if an inner has a child and an outer doesn't.
+ // It is for left-deep-first search.
+ if (!outer.hasChildBlock() && inner.hasChildBlock()) {
+ ExecutionBlock tmp = outer;
+ outer = inner;
+ inner = tmp;
+ }
+
+ buildOrder(outer);
+ buildOrder(inner);
+ }
+ }
+ orderedBlocks.add(current);
+ }
+
+ public boolean hasNext() {
+ return cursor < orderedBlocks.size();
+ }
+
+ public ExecutionBlock nextBlock() {
+ return orderedBlocks.get(cursor++);
+ }
+
+ public ExecutionBlock peek() {
+ return orderedBlocks.get(cursor);
+ }
+
+ public ExecutionBlock peek(int skip) {
+ return orderedBlocks.get(cursor + skip);
+ }
+
+ public void reset() {
+ cursor = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
index 8d3b400..2709c98 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
@@ -25,9 +25,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.event.EventHandler;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import tajo.QueryId;
@@ -36,18 +33,14 @@ import tajo.QueryUnitAttemptId;
import tajo.SubQueryId;
import tajo.catalog.*;
import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.catalog.statistics.TableStat;
import tajo.common.exception.NotImplementedException;
import tajo.conf.TajoConf;
-import tajo.engine.MasterWorkerProtos.Partition;
import tajo.engine.parser.QueryBlock.FromTable;
import tajo.engine.planner.PlannerUtil;
-import tajo.engine.planner.RangePartitionAlgorithm;
-import tajo.engine.planner.UniformRangePartition;
import tajo.engine.planner.global.MasterPlan;
import tajo.engine.planner.logical.*;
import tajo.engine.utils.TupleUtil;
-import tajo.master.SubQuery.PARTITION_TYPE;
+import tajo.master.ExecutionBlock.PartitionType;
import tajo.storage.Fragment;
import tajo.storage.StorageManager;
import tajo.storage.TupleRange;
@@ -55,9 +48,7 @@ import tajo.util.TajoIdUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.*;
import java.util.Map.Entry;
@@ -133,20 +124,6 @@ public class GlobalPlanner {
PlannerUtil.insertNode(parent, store);
return store;
}
-
- public int getTaskNum(SubQuery subQuery) {
- int numTasks;
- GroupbyNode grpNode = (GroupbyNode) PlannerUtil.findTopNode(
- subQuery.getLogicalPlan(), ExprType.GROUP_BY);
- if (subQuery.getParentQuery() == null && grpNode != null
- && grpNode.getGroupingColumns().length == 0) {
- numTasks = 1;
- } else {
- // TODO - to be improved
- numTasks = 32;
- }
- return numTasks;
- }
/**
* Transforms a logical plan to a two-phase plan.
@@ -316,8 +293,8 @@ public class GlobalPlanner {
return logicalPlan;
}
- private Map<StoreTableNode, SubQuery> convertMap =
- new HashMap<StoreTableNode, SubQuery>();
+ private Map<StoreTableNode, ExecutionBlock> convertMap =
+ new HashMap<StoreTableNode, ExecutionBlock>();
/**
* Logical plan을 후위 탐색하면서 SubQuery 생성
@@ -327,7 +304,7 @@ public class GlobalPlanner {
*/
private void recursiveBuildSubQuery(LogicalNode node)
throws IOException {
- SubQuery subQuery;
+ ExecutionBlock subQuery;
StoreTableNode store;
if (node instanceof UnaryNode) {
recursiveBuildSubQuery(((UnaryNode) node).getSubNode());
@@ -340,44 +317,41 @@ public class GlobalPlanner {
} else {
id = QueryIdFactory.newSubQueryId(queryId);
}
- subQuery = new SubQuery(id, sm, this);
+ subQuery = new ExecutionBlock(id);
switch (store.getSubNode().getType()) {
case BST_INDEX_SCAN:
case SCAN: // store - scan
subQuery = makeScanSubQuery(subQuery);
- subQuery.setLogicalPlan(node);
+ subQuery.setPlan(node);
break;
case SELECTION:
case PROJECTION:
case LIMIT:
subQuery = makeUnarySubQuery(store, node, subQuery);
- subQuery.setLogicalPlan(node);
+ subQuery.setPlan(node);
break;
case GROUP_BY:
subQuery = makeGroupbySubQuery(store, node, subQuery);
- subQuery.setLogicalPlan(node);
+ subQuery.setPlan(node);
break;
case SORT:
subQuery = makeSortSubQuery(store, node, subQuery);
- subQuery.setLogicalPlan(node);
+ subQuery.setPlan(node);
break;
case JOIN: // store - join
subQuery = makeJoinSubQuery(store, node, subQuery);
- subQuery.setLogicalPlan(node);
+ subQuery.setPlan(node);
break;
case UNION:
subQuery = makeUnionSubQuery(store, node, subQuery);
- subQuery.setLogicalPlan(node);
+ subQuery.setPlan(node);
break;
default:
subQuery = null;
break;
}
- if (!subQuery.hasChildQuery()) {
- subQuery.setLeafQuery();
- }
convertMap.put(store, subQuery);
}
} else if (node instanceof BinaryNode) {
@@ -390,18 +364,9 @@ public class GlobalPlanner {
}
}
- private SubQuery makeScanSubQuery(SubQuery unit) {
- unit.setOutputType(PARTITION_TYPE.LIST);
- return unit;
- }
-
- private SubQuery makeBSTIndexUnit(LogicalNode plan, SubQuery unit) {
- switch(((IndexWriteNode)plan).getSubNode().getType()){
- case SCAN:
- unit = makeScanSubQuery(unit);
- unit.setLogicalPlan(((IndexWriteNode)plan).getSubNode());
- }
- return unit;
+ private ExecutionBlock makeScanSubQuery(ExecutionBlock block) {
+ block.setPartitionType(PartitionType.LIST);
+ return block;
}
/**
@@ -413,10 +378,10 @@ public class GlobalPlanner {
* @return
* @throws IOException
*/
- private SubQuery makeUnarySubQuery(StoreTableNode rootStore,
- LogicalNode plan, SubQuery unit) throws IOException {
+ private ExecutionBlock makeUnarySubQuery(StoreTableNode rootStore,
+ LogicalNode plan, ExecutionBlock unit) throws IOException {
ScanNode newScan;
- SubQuery prev;
+ ExecutionBlock prev;
UnaryNode unary = (UnaryNode) plan;
UnaryNode child = (UnaryNode) unary.getSubNode();
StoreTableNode prevStore = (StoreTableNode)child.getSubNode();
@@ -429,12 +394,12 @@ public class GlobalPlanner {
prev = convertMap.get(prevStore);
if (prev != null) {
- prev.setParentQuery(unit);
- unit.addChildQuery(newScan, prev);
- prev.setOutputType(PARTITION_TYPE.LIST);
+ prev.setParentBlock(unit);
+ unit.addChildBlock(newScan, prev);
+ prev.setPartitionType(PartitionType.LIST);
}
- unit.setOutputType(PARTITION_TYPE.LIST);
+ unit.setPartitionType(PartitionType.LIST);
return unit;
}
@@ -448,13 +413,13 @@ public class GlobalPlanner {
* @return
* @throws IOException
*/
- private SubQuery makeGroupbySubQuery(StoreTableNode rootStore,
- LogicalNode plan, SubQuery unit) throws IOException {
+ private ExecutionBlock makeGroupbySubQuery(StoreTableNode rootStore,
+ LogicalNode plan, ExecutionBlock unit) throws IOException {
UnaryNode unary = (UnaryNode) plan;
UnaryNode unaryChild;
StoreTableNode prevStore;
ScanNode newScan;
- SubQuery prev;
+ ExecutionBlock prev;
unaryChild = (UnaryNode) unary.getSubNode(); // groupby
ExprType curType = unaryChild.getType();
if (unaryChild.getSubNode().getType() == ExprType.STORE) {
@@ -468,30 +433,30 @@ public class GlobalPlanner {
((UnaryNode) unary.getSubNode()).setSubNode(newScan);
prev = convertMap.get(prevStore);
if (prev != null) {
- prev.setParentQuery(unit);
- unit.addChildQuery(newScan, prev);
+ prev.setParentBlock(unit);
+ unit.addChildBlock(newScan, prev);
}
if (unaryChild.getSubNode().getType() == curType) {
// the second phase
- unit.setOutputType(PARTITION_TYPE.LIST);
+ unit.setPartitionType(PartitionType.LIST);
if (prev != null) {
- prev.setOutputType(PARTITION_TYPE.HASH);
+ prev.setPartitionType(PartitionType.HASH);
}
} else {
// the first phase
- unit.setOutputType(PARTITION_TYPE.HASH);
+ unit.setPartitionType(PartitionType.HASH);
if (prev != null) {
- prev.setOutputType(PARTITION_TYPE.LIST);
+ prev.setPartitionType(PartitionType.LIST);
}
}
} else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
// the first phase
// store - groupby - scan
- unit.setOutputType(PARTITION_TYPE.HASH);
+ unit.setPartitionType(PartitionType.HASH);
} else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
_handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit,
- null, PARTITION_TYPE.LIST);
+ null, PartitionType.LIST);
} else {
// error
}
@@ -499,21 +464,21 @@ public class GlobalPlanner {
}
/**
- *
- *
+ *
+ *
* @param rootStore 생성할 SubQuery의 store
* @param plan logical plan
* @param unit 생성할 SubQuery
* @return
* @throws IOException
*/
- private SubQuery makeUnionSubQuery(StoreTableNode rootStore,
- LogicalNode plan, SubQuery unit) throws IOException {
+ private ExecutionBlock makeUnionSubQuery(StoreTableNode rootStore,
+ LogicalNode plan, ExecutionBlock unit) throws IOException {
UnaryNode unary = (UnaryNode) plan;
StoreTableNode outerStore, innerStore;
- SubQuery prev;
+ ExecutionBlock prev;
UnionNode union = (UnionNode) unary.getSubNode();
- unit.setOutputType(PARTITION_TYPE.LIST);
+ unit.setPartitionType(PartitionType.LIST);
if (union.getOuterNode().getType() == ExprType.STORE) {
outerStore = (StoreTableNode) union.getOuterNode();
@@ -523,13 +488,12 @@ public class GlobalPlanner {
prev = convertMap.get(outerStore);
if (prev != null) {
prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setOutputType(PARTITION_TYPE.LIST);
- prev.setParentQuery(unit);
- prev.setLeafQuery();
- unit.addChildQuery((ScanNode)union.getOuterNode(), prev);
+ prev.setPartitionType(PartitionType.LIST);
+ prev.setParentBlock(unit);
+ unit.addChildBlock((ScanNode) union.getOuterNode(), prev);
}
} else if (union.getOuterNode().getType() == ExprType.UNION) {
- _handleUnionNode(rootStore, union, unit, null, PARTITION_TYPE.LIST);
+ _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
}
if (union.getInnerNode().getType() == ExprType.STORE) {
@@ -540,26 +504,25 @@ public class GlobalPlanner {
prev = convertMap.get(innerStore);
if (prev != null) {
prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setOutputType(PARTITION_TYPE.LIST);
- prev.setParentQuery(unit);
- prev.setLeafQuery();
- unit.addChildQuery((ScanNode)union.getInnerNode(), prev);
+ prev.setPartitionType(PartitionType.LIST);
+ prev.setParentBlock(unit);
+ unit.addChildBlock((ScanNode) union.getInnerNode(), prev);
}
} else if (union.getInnerNode().getType() == ExprType.UNION) {
- _handleUnionNode(rootStore, union, unit, null, PARTITION_TYPE.LIST);
+ _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
}
return unit;
}
- private SubQuery makeSortSubQuery(StoreTableNode rootStore,
- LogicalNode plan, SubQuery unit) throws IOException {
+ private ExecutionBlock makeSortSubQuery(StoreTableNode rootStore,
+ LogicalNode plan, ExecutionBlock unit) throws IOException {
UnaryNode unary = (UnaryNode) plan;
UnaryNode unaryChild;
StoreTableNode prevStore;
ScanNode newScan;
- SubQuery prev;
+ ExecutionBlock prev;
unaryChild = (UnaryNode) unary.getSubNode(); // groupby
ExprType curType = unaryChild.getType();
if (unaryChild.getSubNode().getType() == ExprType.STORE) {
@@ -572,44 +535,44 @@ public class GlobalPlanner {
((UnaryNode) unary.getSubNode()).setSubNode(newScan);
prev = convertMap.get(prevStore);
if (prev != null) {
- prev.setParentQuery(unit);
- unit.addChildQuery(newScan, prev);
+ prev.setParentBlock(unit);
+ unit.addChildBlock(newScan, prev);
if (unaryChild.getSubNode().getType() == curType) {
// TODO - this is duplicated code
- prev.setOutputType(PARTITION_TYPE.RANGE);
+ prev.setPartitionType(PartitionType.RANGE);
} else {
- prev.setOutputType(PARTITION_TYPE.LIST);
+ prev.setPartitionType(PartitionType.LIST);
}
}
if (unaryChild.getSubNode().getType() == curType) {
// the second phase
- unit.setOutputType(PARTITION_TYPE.LIST);
+ unit.setPartitionType(PartitionType.LIST);
} else {
// the first phase
- unit.setOutputType(PARTITION_TYPE.HASH);
+ unit.setPartitionType(PartitionType.HASH);
}
} else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
// the first phase
// store - sort - scan
- unit.setOutputType(PARTITION_TYPE.RANGE);
+ unit.setPartitionType(PartitionType.RANGE);
} else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
_handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit,
- null, PARTITION_TYPE.LIST);
+ null, PartitionType.LIST);
} else {
// error
}
return unit;
}
- private SubQuery makeJoinSubQuery(StoreTableNode rootStore,
- LogicalNode plan, SubQuery unit) throws IOException {
+ private ExecutionBlock makeJoinSubQuery(StoreTableNode rootStore,
+ LogicalNode plan, ExecutionBlock unit) throws IOException {
UnaryNode unary = (UnaryNode)plan;
StoreTableNode outerStore, innerStore;
- SubQuery prev;
+ ExecutionBlock prev;
JoinNode join = (JoinNode) unary.getSubNode();
Schema outerSchema = join.getOuterNode().getOutSchema();
Schema innerSchema = join.getInnerNode().getOutSchema();
- unit.setOutputType(PARTITION_TYPE.LIST);
+ unit.setPartitionType(PartitionType.LIST);
List<Column> outerCollist = new ArrayList<Column>();
List<Column> innerCollist = new ArrayList<Column>();
@@ -639,14 +602,14 @@ public class GlobalPlanner {
insertOuterScan(join, outerStore.getTableName(), outerMeta);
prev = convertMap.get(outerStore);
if (prev != null) {
- prev.setOutputType(PARTITION_TYPE.HASH);
- prev.setParentQuery(unit);
- unit.addChildQuery((ScanNode)join.getOuterNode(), prev);
+ prev.setPartitionType(PartitionType.HASH);
+ prev.setParentBlock(unit);
+ unit.addChildBlock((ScanNode) join.getOuterNode(), prev);
}
- outerStore.setPartitions(PARTITION_TYPE.HASH, outerCols, 32);
+ outerStore.setPartitions(PartitionType.HASH, outerCols, 32);
} else if (join.getOuterNode().getType() == ExprType.UNION) {
_handleUnionNode(rootStore, (UnionNode)join.getOuterNode(), unit,
- outerCols, PARTITION_TYPE.HASH);
+ outerCols, PartitionType.HASH);
} else {
}
@@ -659,14 +622,14 @@ public class GlobalPlanner {
insertInnerScan(join, innerStore.getTableName(), innerMeta);
prev = convertMap.get(innerStore);
if (prev != null) {
- prev.setOutputType(PARTITION_TYPE.HASH);
- prev.setParentQuery(unit);
- unit.addChildQuery((ScanNode)join.getInnerNode(), prev);
+ prev.setPartitionType(PartitionType.HASH);
+ prev.setParentBlock(unit);
+ unit.addChildBlock((ScanNode) join.getInnerNode(), prev);
}
- innerStore.setPartitions(PARTITION_TYPE.HASH, innerCols, 32);
+ innerStore.setPartitions(PartitionType.HASH, innerCols, 32);
} else if (join.getInnerNode().getType() == ExprType.UNION) {
_handleUnionNode(rootStore, (UnionNode)join.getInnerNode(), unit,
- innerCols, PARTITION_TYPE.HASH);
+ innerCols, PartitionType.HASH);
}
return unit;
@@ -683,11 +646,11 @@ public class GlobalPlanner {
* @throws IOException
*/
private void _handleUnionNode(StoreTableNode rootStore, UnionNode union,
- SubQuery cur, Column[] cols, PARTITION_TYPE prevOutputType)
+ ExecutionBlock cur, Column[] cols, PartitionType prevOutputType)
throws IOException {
StoreTableNode store;
TableMeta meta;
- SubQuery prev;
+ ExecutionBlock prev;
if (union.getOuterNode().getType() == ExprType.STORE) {
store = (StoreTableNode) union.getOuterNode();
@@ -696,12 +659,12 @@ public class GlobalPlanner {
prev = convertMap.get(store);
if (prev != null) {
prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setOutputType(prevOutputType);
- prev.setParentQuery(cur);
- cur.addChildQuery((ScanNode)union.getOuterNode(), prev);
+ prev.setPartitionType(prevOutputType);
+ prev.setParentBlock(cur);
+ cur.addChildBlock((ScanNode) union.getOuterNode(), prev);
}
if (cols != null) {
- store.setPartitions(PARTITION_TYPE.LIST, cols, 32);
+ store.setPartitions(PartitionType.LIST, cols, 32);
}
} else if (union.getOuterNode().getType() == ExprType.UNION) {
_handleUnionNode(rootStore, (UnionNode)union.getOuterNode(), cur, cols,
@@ -715,12 +678,12 @@ public class GlobalPlanner {
prev = convertMap.get(store);
if (prev != null) {
prev.getStoreTableNode().setTableName(rootStore.getTableName());
- prev.setOutputType(prevOutputType);
- prev.setParentQuery(cur);
- cur.addChildQuery((ScanNode)union.getInnerNode(), prev);
+ prev.setPartitionType(prevOutputType);
+ prev.setParentBlock(cur);
+ cur.addChildBlock((ScanNode) union.getInnerNode(), prev);
}
if (cols != null) {
- store.setPartitions(PARTITION_TYPE.LIST, cols, 32);
+ store.setPartitions(PartitionType.LIST, cols, 32);
}
} else if (union.getInnerNode().getType() == ExprType.UNION) {
_handleUnionNode(rootStore, (UnionNode)union.getInnerNode(), cur, cols,
@@ -729,20 +692,19 @@ public class GlobalPlanner {
}
@VisibleForTesting
- public SubQuery createMultilevelGroupby(
- SubQuery firstPhaseGroupby, Column[] keys)
+ public ExecutionBlock createMultilevelGroupby(
+ ExecutionBlock firstPhaseGroupby, Column[] keys)
throws CloneNotSupportedException, IOException {
- SubQuery secondPhaseGroupby = firstPhaseGroupby.getParentQuery();
+ ExecutionBlock secondPhaseGroupby = firstPhaseGroupby.getParentBlock();
Preconditions.checkState(secondPhaseGroupby.getScanNodes().length == 1);
ScanNode secondScan = secondPhaseGroupby.getScanNodes()[0];
GroupbyNode secondGroupby = (GroupbyNode) secondPhaseGroupby.
getStoreTableNode().getSubNode();
- SubQuery newPhaseGroupby = new SubQuery(
- QueryIdFactory.newSubQueryId(
- firstPhaseGroupby.getId().getQueryId()), sm, this);
+ ExecutionBlock newPhaseGroupby = new ExecutionBlock(
+ QueryIdFactory.newSubQueryId(firstPhaseGroupby.getId().getQueryId()));
LogicalNode tmp = PlannerUtil.findTopParentNode(
- firstPhaseGroupby.getLogicalPlan(), ExprType.GROUP_BY);
+ firstPhaseGroupby.getPlan(), ExprType.GROUP_BY);
GroupbyNode firstGroupby;
if (tmp instanceof UnaryNode) {
firstGroupby = (GroupbyNode) ((UnaryNode)tmp).getSubNode();
@@ -775,9 +737,9 @@ public class GlobalPlanner {
secondGroupby.getTargets());
newGroupby.setSubNode(newScan);
newStore.setSubNode(newGroupby);
- newPhaseGroupby.setLogicalPlan(newStore);
+ newPhaseGroupby.setPlan(newStore);
- secondPhaseGroupby.removeChildQuery(secondScan);
+ secondPhaseGroupby.removeChildBlock(secondScan);
// update the scan node of last phase
secondScan = GlobalPlannerUtils.newScanPlan(secondScan.getInSchema(),
@@ -785,15 +747,15 @@ public class GlobalPlanner {
sm.getTablePath(newPhaseGroupby.getOutputName()));
secondScan.setLocal(true);
secondGroupby.setSubNode(secondScan);
- secondPhaseGroupby.setLogicalPlan(secondPhaseGroupby.getLogicalPlan());
+ secondPhaseGroupby.setPlan(secondPhaseGroupby.getPlan());
// insert the new SubQuery
// between the first phase and the second phase
- secondPhaseGroupby.addChildQuery(secondScan, newPhaseGroupby);
- newPhaseGroupby.addChildQuery(newPhaseGroupby.getScanNodes()[0],
+ secondPhaseGroupby.addChildBlock(secondScan, newPhaseGroupby);
+ newPhaseGroupby.addChildBlock(newPhaseGroupby.getScanNodes()[0],
firstPhaseGroupby);
- newPhaseGroupby.setParentQuery(secondPhaseGroupby);
- firstPhaseGroupby.setParentQuery(newPhaseGroupby);
+ newPhaseGroupby.setParentBlock(secondPhaseGroupby);
+ firstPhaseGroupby.setParentBlock(newPhaseGroupby);
return newPhaseGroupby;
}
@@ -823,13 +785,13 @@ public class GlobalPlanner {
private MasterPlan convertToGlobalPlan(IndexWriteNode index,
LogicalNode logicalPlan) throws IOException {
recursiveBuildSubQuery(logicalPlan);
- SubQuery root;
+ ExecutionBlock root;
if (index != null) {
SubQueryId id = QueryIdFactory.newSubQueryId(queryId);
- SubQuery unit = new SubQuery(id, sm, this);
+ ExecutionBlock unit = new ExecutionBlock(id);
root = makeScanSubQuery(unit);
- root.setLogicalPlan(index);
+ root.setPlan(index);
} else {
root = convertMap.get(((LogicalRootNode)logicalPlan).getSubNode());
root.getStoreTableNode().setLocal(false);
@@ -837,211 +799,6 @@ public class GlobalPlanner {
return new MasterPlan(root);
}
- public SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
- Column[] keys = null;
- // if the next query is join,
- // set the partition number for the current logicalUnit
- // TODO: the union handling is required when a join has unions as its child
- SubQuery parentQueryUnit = subQuery.getParentQuery();
- if (parentQueryUnit != null) {
- if (parentQueryUnit.getStoreTableNode().getSubNode().getType() == ExprType.JOIN) {
- subQuery.getStoreTableNode().setPartitions(subQuery.getOutputType(),
- subQuery.getStoreTableNode().getPartitionKeys(), n);
- keys = subQuery.getStoreTableNode().getPartitionKeys();
- }
- }
-
- StoreTableNode store = subQuery.getStoreTableNode();
- // set the partition number for group by and sort
- if (subQuery.getOutputType() == PARTITION_TYPE.HASH) {
- if (store.getSubNode().getType() == ExprType.GROUP_BY) {
- GroupbyNode groupby = (GroupbyNode)store.getSubNode();
- keys = groupby.getGroupingColumns();
- }
- } else if (subQuery.getOutputType() == PARTITION_TYPE.RANGE) {
- if (store.getSubNode().getType() == ExprType.SORT) {
- SortNode sort = (SortNode)store.getSubNode();
- keys = new Column[sort.getSortKeys().length];
- for (int i = 0; i < keys.length; i++) {
- keys[i] = sort.getSortKeys()[i].getSortKey();
- }
- }
- }
- if (keys != null) {
- if (keys.length == 0) {
- store.setPartitions(subQuery.getOutputType(), new Column[]{}, 1);
- } else {
- store.setPartitions(subQuery.getOutputType(), keys, n);
- }
- } else {
- store.setListPartition();
- }
- return subQuery;
- }
-
- /**
- * 입력 받은 SubQuery를 QueryUnit들로 localize
- *
- * @param subQuery localize할 SubQuery
- * @param maxTaskNum localize된 QueryUnit의 최대 개수
- * @return
- * @throws IOException
- * @throws URISyntaxException
- */
- public QueryUnit[] localize(SubQuery subQuery, int maxTaskNum)
- throws IOException, URISyntaxException {
- FileStatus[] files;
- Fragment[] frags;
- List<Fragment> fragList;
- List<URI> uriList;
- // fragments and fetches are maintained for each scan of the SubQuery
- Map<ScanNode, List<Fragment>> fragMap = new HashMap<ScanNode, List<Fragment>>();
- Map<ScanNode, List<URI>> fetchMap = new HashMap<ScanNode, List<URI>>();
-
- // set partition numbers for two phase algorithms
- // TODO: the following line might occur a bug. see the line 623
- subQuery = setPartitionNumberForTwoPhase(subQuery, maxTaskNum);
-
- SortSpec [] sortSpecs = null;
- Schema sortSchema;
-
- // make fetches and fragments for each scan
- Path tablepath;
- ScanNode[] scans = subQuery.getScanNodes();
- for (ScanNode scan : scans) {
- if (scan.getTableId().startsWith(QueryId.PREFIX)) {
- tablepath = sm.getTablePath(scan.getTableId());
- } else {
- tablepath = catalog.getTableDesc(scan.getTableId()).getPath();
- }
- if (scan.isLocal()) {
- SubQuery prev = subQuery.getChildIterator().next();
- TableStat stat = prev.getStats();
- if (stat.getNumRows() == 0) {
- return new QueryUnit[0];
- }
- // make fetches from the previous query units
- uriList = new ArrayList<URI>();
- fragList = new ArrayList<Fragment>();
-
- if (prev.getOutputType() == PARTITION_TYPE.RANGE) {
- StoreTableNode store = (StoreTableNode) prev.getLogicalPlan();
- SortNode sort = (SortNode) store.getSubNode();
- sortSpecs = sort.getSortKeys();
- sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
-
- // calculate the number of tasks based on the data size
- int mb = (int) Math.ceil((double)stat.getNumBytes() / 1048576);
- LOG.info("Total size of intermediate data is approximately " + mb + " MB");
-
- maxTaskNum = (int) Math.ceil((double)mb / 64); // determine the number of task by considering 1 task per 64MB
- LOG.info("The desired number of tasks is set to " + maxTaskNum);
-
- // calculate the number of maximum query ranges
- TupleRange mergedRange =
- TupleUtil.columnStatToRange(sort.getOutSchema(),
- sortSchema, stat.getColumnStats());
- RangePartitionAlgorithm partitioner =
- new UniformRangePartition(sortSchema, mergedRange);
- BigDecimal card = partitioner.getTotalCardinality();
-
- // if the number of the range cardinality is less than the desired number of tasks,
- // we set the the number of tasks to the number of range cardinality.
- if (card.compareTo(new BigDecimal(maxTaskNum)) < 0) {
- LOG.info("The range cardinality (" + card
- + ") is less then the desired number of tasks (" + maxTaskNum + ")");
- maxTaskNum = card.intValue();
- }
-
- LOG.info("Try to divide " + mergedRange + " into " + maxTaskNum +
- " sub ranges (total units: " + maxTaskNum + ")");
- TupleRange [] ranges = partitioner.partition(maxTaskNum);
-
- String [] queries = TupleUtil.rangesToQueries(sortSpecs, ranges);
- for (QueryUnit qu : subQuery.getChildQuery(scan).getQueryUnits()) {
- for (Partition p : qu.getPartitions()) {
- for (String query : queries) {
- uriList.add(new URI(p.getFileName() + "&" + query));
- }
- }
- }
- } else {
- SubQuery child = subQuery.getChildQuery(scan);
- QueryUnit[] units;
- if (child.getStoreTableNode().getSubNode().getType() ==
- ExprType.UNION) {
- List<QueryUnit> list = Lists.newArrayList();
- for (ScanNode s : child.getScanNodes()) {
- for (QueryUnit qu : child.getChildQuery(s).getQueryUnits()) {
- list.add(qu);
- }
- }
- units = new QueryUnit[list.size()];
- units = list.toArray(units);
- } else {
- units = child.getQueryUnits();
- }
- for (QueryUnit qu : units) {
- for (Partition p : qu.getPartitions()) {
- uriList.add(new URI(p.getFileName()));
-// System.out.println("Partition: " + uriList.get(uriList.size() - 1));
- }
- }
- }
-
- fetchMap.put(scan, uriList);
- // one fragment is shared by query units
- Fragment frag = new Fragment(scan.getTableId(), tablepath,
- TCatUtil.newTableMeta(scan.getInSchema(),StoreType.CSV),
- 0, 0, null);
- fragList.add(frag);
- fragMap.put(scan, fragList);
- } else {
- fragList = new ArrayList<Fragment>();
- // set fragments for each scan
- if (subQuery.hasChildQuery() &&
- (subQuery.getChildQuery(scan).getOutputType() == PARTITION_TYPE.HASH ||
- subQuery.getChildQuery(scan).getOutputType() == PARTITION_TYPE.RANGE)
- ) {
- files = sm.getFileSystem().listStatus(tablepath);
- } else {
- files = new FileStatus[1];
- files[0] = sm.getFileSystem().getFileStatus(tablepath);
- }
- for (FileStatus file : files) {
- // make fragments
- if (scan.isBroadcast()) {
- frags = sm.splitBroadcastTable(file.getPath());
- } else {
- frags = sm.split(file.getPath());
- }
- for (Fragment f : frags) {
- // TODO: the fragment ID should be set before
- f.setId(scan.getTableId());
- fragList.add(f);
- }
- }
- fragMap.put(scan, fragList);
- }
- }
-
- List<QueryUnit> unitList = null;
- if (scans.length == 1) {
- unitList = makeUnaryQueryUnit(subQuery, maxTaskNum, fragMap, fetchMap,
- sortSpecs);
- } else if (scans.length == 2) {
- unitList = makeBinaryQueryUnit(subQuery, maxTaskNum, fragMap, fetchMap);
- }
- // TODO: The partition number should be set here,
- // because the number of query units is decided above.
-
- QueryUnit[] units = new QueryUnit[unitList.size()];
- units = unitList.toArray(units);
- subQuery.setQueryUnits(unitList);
-
- return units;
- }
-
/**
* 2개의 scan을 가진 QueryUnit들에 fragment와 fetch를 할당
*
@@ -1054,13 +811,14 @@ public class GlobalPlanner {
private List<QueryUnit> makeBinaryQueryUnit(SubQuery subQuery, final int n,
Map<ScanNode, List<Fragment>> fragMap,
Map<ScanNode, List<URI>> fetchMap) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ ScanNode[] scans = execBlock.getScanNodes();
List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
final int maxQueryUnitNum = n;
- ScanNode[] scans = subQuery.getScanNodes();
- if (subQuery.hasChildQuery()) {
- SubQuery prev = subQuery.getChildQuery(scans[0]);
- switch (prev.getOutputType()) {
+ if (execBlock.hasChildBlock()) {
+ ExecutionBlock prev = execBlock.getChildBlock(scans[0]);
+ switch (prev.getPartitionType()) {
case BROADCAST:
throw new NotImplementedException();
case HASH:
@@ -1076,8 +834,6 @@ public class GlobalPlanner {
throw new NotImplementedException();
}
} else {
-// unitList = assignFragmentsByRoundRobin(unit, unitList, fragMap,
-// maxQueryUnitNum);
queryUnits = makeQueryUnitsForBinaryPlan(subQuery,
queryUnits, fragMap);
}
@@ -1088,12 +844,13 @@ public class GlobalPlanner {
public List<QueryUnit> makeQueryUnitsForBinaryPlan(
SubQuery subQuery, List<QueryUnit> queryUnits,
Map<ScanNode, List<Fragment>> fragmentMap) {
+ ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit queryUnit;
- if (subQuery.hasJoinPlan()) {
+ if (execBlock.hasJoin()) {
// make query units for every composition of fragments of each scan
Preconditions.checkArgument(fragmentMap.size()==2);
- ScanNode [] scanNodes = subQuery.getScanNodes();
+ ScanNode [] scanNodes = execBlock.getScanNodes();
String innerId = null, outerId = null;
// If one relation is set to broadcast, it meaning that the relation
@@ -1148,79 +905,6 @@ public class GlobalPlanner {
return queryUnits;
}
-
- /**
- * 1개의 scan을 가진 QueryUnit들에 대해 fragment와 fetch를 할당
- *
- * @param subQuery
- * @param n
- * @param fragMap
- * @param fetchMap
- * @return
- */
- private List<QueryUnit> makeUnaryQueryUnit(SubQuery subQuery, int n,
- Map<ScanNode, List<Fragment>> fragMap,
- Map<ScanNode, List<URI>> fetchMap, SortSpec[] sortSpecs) throws UnsupportedEncodingException {
- List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
- int maxQueryUnitNum;
- ScanNode scan = subQuery.getScanNodes()[0];
- maxQueryUnitNum = n;
- if (subQuery.hasChildQuery()) {
- SubQuery child = subQuery.getChildQuery(scan);
- if (child.getStoreTableNode().getSubNode().getType() ==
- ExprType.GROUP_BY) {
- GroupbyNode groupby = (GroupbyNode) child.getStoreTableNode().
- getSubNode();
- if (groupby.getGroupingColumns().length == 0) {
- maxQueryUnitNum = 1;
- }
- }
- switch (child.getOutputType()) {
- case BROADCAST:
- throw new NotImplementedException();
-
- case HASH:
- if (scan.isLocal()) {
- queryUnits = assignFetchesToUnaryByHash(subQuery,
- queryUnits, scan, fetchMap.get(scan), maxQueryUnitNum);
- queryUnits = assignEqualFragment(queryUnits, scan,
- fragMap.get(scan).get(0));
- } else {
- throw new NotImplementedException();
- }
- break;
- case RANGE:
- if (scan.isLocal()) {
- Schema rangeSchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
- queryUnits = assignFetchesByRange(subQuery,
- queryUnits, scan, fetchMap.get(scan),
- maxQueryUnitNum, rangeSchema, sortSpecs[0].isAscending());
- queryUnits = assignEqualFragment(queryUnits, scan,
- fragMap.get(scan).get(0));
- } else {
- throw new NotImplementedException();
- }
- break;
-
- case LIST:
- if (scan.isLocal()) {
- queryUnits = assignFetchesByRoundRobin(subQuery,
- queryUnits, scan, fetchMap.get(scan), maxQueryUnitNum);
- queryUnits = assignEqualFragment(queryUnits, scan,
- fragMap.get(scan).get(0));
- } else {
- throw new NotImplementedException();
- }
- break;
- }
- } else {
-// queryUnits = assignFragmentsByRoundRobin(subQuery, queryUnits, scan,
-// fragMap.get(scan), maxQueryUnitNum);
- queryUnits = makeQueryUnitsForEachFragment(subQuery,
- queryUnits, scan, fragMap.get(scan));
- }
- return queryUnits;
- }
private List<QueryUnit> makeQueryUnitsForEachFragment(
SubQuery subQuery, List<QueryUnit> queryUnits,
@@ -1235,18 +919,11 @@ public class GlobalPlanner {
}
private QueryUnit newQueryUnit(SubQuery subQuery) {
+ ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit unit = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId()), subQuery.isLeafQuery(),
- subQuery.eventHandler);
- unit.setLogicalPlan(subQuery.getLogicalPlan());
- return unit;
- }
-
- private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) {
- QueryUnit unit = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.isLeafQuery(),
+ QueryIdFactory.newQueryUnitId(subQuery.getId()), execBlock.isLeafBlock(),
subQuery.eventHandler);
- unit.setLogicalPlan(subQuery.getLogicalPlan());
+ unit.setLogicalPlan(execBlock.getPlan());
return unit;
}
@@ -1539,22 +1216,6 @@ public class GlobalPlanner {
}
/**
- * Unary QueryUnit들에 broadcast partition된 fetch를 할당
- *
- * @param units
- * @param scan
- * @param uriList
- */
- private void assignFetchesByBroadcast(QueryUnit[] units, ScanNode scan, List<URI> uriList) {
- for (URI uri : uriList) {
- for (QueryUnit unit : units) {
- // TODO: add each uri to every units
- unit.addFetch(scan.getTableId(), uri);
- }
- }
- }
-
- /**
* Unary QueryUnit들에 대하여 동일한 fragment를 할당
*
* @param unitList
@@ -1587,80 +1248,4 @@ public class GlobalPlanner {
}
return unitList;
}
-
- private Map<String, Map<ScanNode, List<Fragment>>> hashFragments(Map<ScanNode,
- List<Fragment>> fragMap) {
- SortedMap<String, Map<ScanNode, List<Fragment>>> hashed =
- new TreeMap<String, Map<ScanNode,List<Fragment>>>();
- String key;
- Map<ScanNode, List<Fragment>> m;
- List<Fragment> fragList;
- for (Entry<ScanNode, List<Fragment>> e : fragMap.entrySet()) {
- for (Fragment f : e.getValue()) {
- key = f.getPath().getName();
- if (hashed.containsKey(key)) {
- m = hashed.get(key);
- } else {
- m = new HashMap<ScanNode, List<Fragment>>();
- }
- if (m.containsKey(e.getKey())) {
- fragList = m.get(e.getKey());
- } else {
- fragList = new ArrayList<Fragment>();
- }
- fragList.add(f);
- m.put(e.getKey(), fragList);
- hashed.put(key, m);
- }
- }
-
- return hashed;
- }
-
- private Collection<List<Fragment>> hashFragments(List<Fragment> frags) {
- SortedMap<String, List<Fragment>> hashed = new TreeMap<String, List<Fragment>>();
- for (Fragment f : frags) {
- if (hashed.containsKey(f.getPath().getName())) {
- hashed.get(f.getPath().getName()).add(f);
- } else {
- List<Fragment> list = new ArrayList<Fragment>();
- list.add(f);
- hashed.put(f.getPath().getName(), list);
- }
- }
-
- return hashed.values();
- }
-
- public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
- ScanNode[] scans = subQuery.getScanNodes();
- Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
- TableMeta meta;
- Path inputPath;
-
- ScanNode scan = scans[0];
- TableDesc desc = catalog.getTableDesc(scan.getTableId());
- inputPath = desc.getPath();
- meta = desc.getMeta();
-
- // TODO - should be change the inner directory
- Path oldPath = new Path(inputPath, "data");
- FileSystem fs = inputPath.getFileSystem(conf);
- if (fs.exists(oldPath)) {
- inputPath = oldPath;
- }
- List<Fragment> fragments = sm.getSplits(scan.getTableId(), meta, inputPath);
-
- QueryUnit queryUnit;
- List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-
- int i = 0;
- for (Fragment fragment : fragments) {
- queryUnit = newQueryUnit(subQuery, i++);
- queryUnit.setFragment(scan.getTableId(), fragment);
- queryUnits.add(queryUnit);
- }
-
- return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
index cdd018f..292b9fd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import tajo.QueryConf;
import tajo.QueryId;
-import tajo.QueryUnitId;
import tajo.SubQueryId;
import tajo.TajoProtos.QueryState;
import tajo.catalog.TCatUtil;
@@ -65,8 +64,8 @@ public class Query implements EventHandler<QueryEvent> {
private final EventHandler eventHandler;
private final MasterPlan plan;
private final StorageManager sm;
- private PriorityQueue<SubQuery> scheduleQueue;
private QueryContext context;
+ private ExecutionBlockCursor cursor;
// Query Status
private final QueryId id;
@@ -81,6 +80,7 @@ public class Query implements EventHandler<QueryEvent> {
// Internal Variables
private final Lock readLock;
private final Lock writeLock;
+ private int priority = 100;
// State Machine
private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
@@ -115,7 +115,6 @@ public class Query implements EventHandler<QueryEvent> {
final long appSubmitTime,
final String queryStr,
final EventHandler eventHandler,
- final GlobalPlanner planner,
final MasterPlan plan, final StorageManager sm) {
this.context = context;
this.conf = context.getConf();
@@ -127,13 +126,12 @@ public class Query implements EventHandler<QueryEvent> {
this.eventHandler = eventHandler;
this.plan = plan;
this.sm = sm;
+ cursor = new ExecutionBlockCursor(plan);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
- this.scheduleQueue = new PriorityQueue<SubQuery>(1,new PriorityComparator());
-
stateMachine = stateMachineFactory.make(this);
}
@@ -145,10 +143,6 @@ public class Query implements EventHandler<QueryEvent> {
return FileSystem.get(conf);
}
- protected StorageManager getStorageManager() {
- return this.sm;
- }
-
public float getProgress() {
QueryState state = getStateMachine().getCurrentState();
if (state == QueryState.QUERY_SUCCEEDED) {
@@ -234,17 +228,6 @@ public class Query implements EventHandler<QueryEvent> {
resultDesc = desc;
}
- class PriorityComparator implements Comparator<SubQuery> {
- public PriorityComparator() {
-
- }
-
- @Override
- public int compare(SubQuery s1, SubQuery s2) {
- return s1.getPriority().get() - s2.getPriority().get();
- }
- }
-
public MasterPlan getPlan() {
return plan;
}
@@ -253,36 +236,17 @@ public class Query implements EventHandler<QueryEvent> {
return stateMachine;
}
- public void addSubQuery(SubQuery q) {
- q.setQueryContext(context);
- q.setEventHandler(eventHandler);
- q.setClock(clock);
- subqueries.put(q.getId(), q);
+ public void addSubQuery(SubQuery subquery) {
+ subqueries.put(subquery.getId(), subquery);
}
public QueryId getId() {
return this.id;
}
-
- public String getQueryStr() {
- return this.queryStr;
- }
-
- public Iterator<SubQuery> getSubQueryIterator() {
- return this.subqueries.values().iterator();
- }
public SubQuery getSubQuery(SubQueryId id) {
return this.subqueries.get(id);
}
-
- public Collection<SubQuery> getSubQueries() {
- return this.subqueries.values();
- }
-
- public QueryUnit getQueryUnit(QueryUnitId id) {
- return this.getSubQuery(id.getSubQueryId()).getQueryUnit(id);
- }
public QueryState getState() {
readLock.lock();
@@ -293,8 +257,8 @@ public class Query implements EventHandler<QueryEvent> {
}
}
- public int getScheduleQueueSize() {
- return scheduleQueue.size();
+ public ExecutionBlockCursor getExecutionBlockCursor() {
+ return cursor;
}
static class InitTransition
@@ -303,136 +267,16 @@ public class Query implements EventHandler<QueryEvent> {
@Override
public QueryState transition(Query query, QueryEvent queryEvent) {
query.setStartTime();
- scheduleSubQueriesPostfix(query);
- LOG.info("Scheduled SubQueries: " + query.getScheduleQueueSize());
-
- return QueryState.QUERY_INIT;
- }
-
- private int priority = 0;
-
- private void scheduleSubQueriesPostfix(Query query) {
- SubQuery root = query.getPlan().getRoot();
-
- scheduleSubQueriesPostfix_(query, root);
-
- root.setPriority(priority);
- query.addSubQuery(root);
- query.schedule(root);
- }
- private void scheduleSubQueriesPostfix_(Query query, SubQuery current) {
- if (current.hasChildQuery()) {
- if (current.getChildQueries().size() == 1) {
- SubQuery subQuery = current.getChildQueries().iterator().next();
- scheduleSubQueriesPostfix_(query, subQuery);
- identifySubQuery(subQuery);
-
- query.addSubQuery(subQuery);
- query.schedule(subQuery);
-
- priority++;
- } else {
- Iterator<SubQuery> it = current.getChildQueries().iterator();
- SubQuery outer = it.next();
- SubQuery inner = it.next();
-
- // Switch between outer and inner
- // if an inner has a child and an outer doesn't.
- // It is for left-deep-first search.
- if (!outer.hasChildQuery() && inner.hasChildQuery()) {
- SubQuery tmp = outer;
- outer = inner;
- inner = tmp;
- }
-
- scheduleSubQueriesPostfix_(query, outer);
- scheduleSubQueriesPostfix_(query, inner);
-
- identifySubQuery(outer);
- identifySubQuery(inner);
-
- query.addSubQuery(outer);
- query.schedule(outer);
- query.addSubQuery(inner);
- query.schedule(inner);
-
- priority++;
- }
- }
- }
-
- private void identifySubQuery(SubQuery subQuery) {
- SubQuery parent = subQuery.getParentQuery();
-
- if (!subQuery.hasChildQuery()) {
-
- if (parent.getScanNodes().length == 2) {
- Iterator<SubQuery> childIter = subQuery.getParentQuery().getChildIterator();
-
- while (childIter.hasNext()) {
- SubQuery another = childIter.next();
- if (!subQuery.equals(another)) {
- if (another.hasChildQuery()) {
- subQuery.setPriority(++priority);
- }
- }
- }
-
- if (subQuery.getPriority() == null) {
- subQuery.setPriority(0);
- }
- } else {
- // if subQuery is leaf and not part of join.
- if (!subQuery.hasChildQuery()) {
- subQuery.setPriority(0);
- }
- }
- } else {
- subQuery.setPriority(priority);
- }
- }
-
- private void scheduleSubQueries(Query query, SubQuery current) {
- int priority = 0;
-
- if (current.hasChildQuery()) {
-
- int maxPriority = 0;
- Iterator<SubQuery> it = current.getChildIterator();
- while (it.hasNext()) {
- SubQuery su = it.next();
- scheduleSubQueries(query, su);
-
- if (su.getPriority().get() > maxPriority) {
- maxPriority = su.getPriority().get();
- }
- }
-
- priority = maxPriority + 1;
-
- } else {
- SubQuery parent = current.getParentQuery();
- priority = 0;
-
- if (parent.getScanNodes().length == 2) {
- Iterator<SubQuery> childIter = current.getParentQuery().getChildIterator();
-
- while (childIter.hasNext()) {
- SubQuery another = childIter.next();
- if (!current.equals(another)) {
- if (another.hasChildQuery()) {
- priority = another.getPriority().get() + 1;
- }
- }
- }
- }
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ while(cursor.hasNext()) {
+ ExecutionBlock block = cursor.nextBlock();
+ System.out.println(block.getId());
+ System.out.println(block.getPlan());
+ System.out.println("--------------------------------");
}
-
- current.setPriority(priority);
- // TODO
- query.addSubQuery(current);
- query.schedule(current);
+ query.getExecutionBlockCursor().reset();
+ return QueryState.QUERY_INIT;
}
}
@@ -441,8 +285,11 @@ public class Query implements EventHandler<QueryEvent> {
@Override
public void transition(Query query, QueryEvent queryEvent) {
- SubQuery subQuery = query.takeSubQuery();
- LOG.info("Schedule unit plan: \n" + subQuery.getLogicalPlan());
+ SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
+ query.sm);
+ subQuery.setPriority(query.priority--);
+ query.addSubQuery(subQuery);
+ LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
subQuery.handle(new SubQueryEvent(subQuery.getId(),
SubQueryEventType.SQ_INIT));
}
@@ -453,17 +300,27 @@ public class Query implements EventHandler<QueryEvent> {
@Override
public QueryState transition(Query query, QueryEvent event) {
-
+ // increase the count for completed subqueries
query.completedSubQueryCount++;
SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ // if the subquery is succeeded
if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
- SubQuerySucceeEvent succeeEvent = (SubQuerySucceeEvent) castEvent;
+ if (cursor.hasNext()) {
+ SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
+ nextSubQuery.setPriority(query.priority--);
+ query.addSubQuery(nextSubQuery);
+ nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
+ SubQueryEventType.SQ_INIT));
+ LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority().get());
+ LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+ QueryState state = query.checkQueryForCompleted();
+ return state;
- SubQuery nextSubQuery = query.takeSubQuery();
-
- if (nextSubQuery == null) {
+ } else {
if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
+ SubQuerySucceeEvent succeeEvent = (SubQuerySucceeEvent) castEvent;
SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
succeeEvent.getTableMeta(), query.context.getOutputPath());
@@ -478,20 +335,13 @@ public class Query implements EventHandler<QueryEvent> {
if (query.context.isCreateTableQuery()) {
query.context.getCatalog().addTable(desc);
}
- return query.finished(QueryState.QUERY_SUCCEEDED);
}
- }
- nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
- SubQueryEventType.SQ_INIT));
- LOG.info("Scheduling SubQuery's Priority: " + (100 - nextSubQuery.getPriority().get()));
- LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getLogicalPlan());
- QueryState state = query.checkQueryForCompleted();
- return state;
- } else if (castEvent.getFinalState() == SubQueryState.FAILED) {
- return QueryState.QUERY_FAILED;
+ return query.finished(QueryState.QUERY_SUCCEEDED);
+ }
} else {
- return query.checkQueryForCompleted();
+ // if at least one subquery is failed, the query is also failed.
+ return QueryState.QUERY_FAILED;
}
}
}
@@ -529,6 +379,10 @@ public class Query implements EventHandler<QueryEvent> {
return finalState;
}
+ /**
+ * Check if all subqueries of the query are completed
+ * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
+ */
QueryState checkQueryForCompleted() {
if (completedSubQueryCount == subqueries.size()) {
return QueryState.QUERY_SUCCEEDED;
@@ -563,75 +417,18 @@ public class Query implements EventHandler<QueryEvent> {
}
}
- public void schedule(SubQuery subQuery) {
- scheduleQueue.add(subQuery);
- }
-
- private SubQuery takeSubQuery() {
- SubQuery unit = removeFromScheduleQueue();
- if (unit == null) {
- return null;
- }
- List<SubQuery> pended = new ArrayList<SubQuery>();
- Priority priority = unit.getPriority();
-
- do {
- if (isReady(unit)) {
- break;
- } else {
- pended.add(unit);
- }
- unit = removeFromScheduleQueue();
- if (unit == null) {
- scheduleQueue.addAll(pended);
- return null;
- }
- } while (priority.equals(unit.getPriority()));
-
- if (!priority.equals(unit.getPriority())) {
- pended.add(unit);
- unit = null;
- }
- scheduleQueue.addAll(pended);
- return unit;
- }
-
- private boolean isReady(SubQuery subQuery) {
- if (subQuery.hasChildQuery()) {
- for (SubQuery child : subQuery.getChildQueries()) {
- if (child.getState() !=
- SubQueryState.SUCCEEDED) {
- return false;
- }
- }
- return true;
- } else {
- return true;
- }
- }
-
- private SubQuery removeFromScheduleQueue() {
- if (scheduleQueue.isEmpty()) {
- return null;
- } else {
- return scheduleQueue.remove();
- }
- }
-
-
-
private void writeStat(Path outputPath, SubQuery subQuery, TableStat stat)
throws IOException {
-
- if (subQuery.getLogicalPlan().getType() == ExprType.CREATE_INDEX) {
- IndexWriteNode index = (IndexWriteNode) subQuery.getLogicalPlan();
+ ExecutionBlock execBlock = subQuery.getBlock();
+ if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
+ IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index");
TableMeta meta;
if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
meta = sm.getTableMeta(indexPath);
} else {
meta = TCatUtil
- .newTableMeta(subQuery.getOutputSchema(), StoreType.CSV);
+ .newTableMeta(execBlock.getOutputSchema(), StoreType.CSV);
}
String indexName = IndexUtil.getIndexName(index.getTableName(),
index.getSortSpecs());
@@ -641,7 +438,7 @@ public class Query implements EventHandler<QueryEvent> {
sm.writeTableMeta(indexPath, meta);
} else {
- TableMeta meta = TCatUtil.newTableMeta(subQuery.getOutputSchema(),
+ TableMeta meta = TCatUtil.newTableMeta(execBlock.getOutputSchema(),
StoreType.CSV);
meta.setStat(stat);
sm.writeTableMeta(outputPath, meta);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 8a6d75f..98878d3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -124,7 +124,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
query = new Query(queryContext, queryId, clock, appSubmitTime,
- "", dispatcher.getEventHandler(), null, masterPlan,
+ "", dispatcher.getEventHandler(), masterPlan,
masterContext.getStorageManager());
initStagingDir();
@@ -239,10 +239,18 @@ public class QueryMaster extends CompositeService implements EventHandler {
return dispatcher;
}
- public Query getQuery(){
+ public Clock getClock() {
+ return clock;
+ }
+
+ public Query getQuery() {
return query;
}
+ public SubQuery getSubQuery(SubQueryId subQueryId) {
+ return query.getSubQuery(subQueryId);
+ }
+
public QueryId getQueryId() {
return queryId;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fef3dd50/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
index f0cbd25..7cf4422 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
@@ -23,24 +23,18 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import tajo.QueryIdFactory;
import tajo.SubQueryId;
-import tajo.catalog.CatalogService;
-import tajo.catalog.Schema;
-import tajo.catalog.SortSpec;
-import tajo.catalog.TCatUtil;
+import tajo.catalog.*;
import tajo.catalog.proto.CatalogProtos.StoreType;
import tajo.catalog.statistics.TableStat;
import tajo.conf.TajoConf.ConfVars;
import tajo.engine.planner.PlannerUtil;
import tajo.engine.planner.RangePartitionAlgorithm;
import tajo.engine.planner.UniformRangePartition;
-import tajo.engine.planner.logical.GroupbyNode;
-import tajo.engine.planner.logical.ScanNode;
-import tajo.engine.planner.logical.SortNode;
-import tajo.engine.planner.logical.StoreTableNode;
+import tajo.engine.planner.logical.*;
import tajo.engine.utils.TupleUtil;
import tajo.exception.InternalException;
+import tajo.master.ExecutionBlock.PartitionType;
import tajo.master.QueryUnit.IntermediateEntry;
-import tajo.master.SubQuery.PARTITION_TYPE;
import tajo.storage.Fragment;
import tajo.storage.TupleRange;
import tajo.util.TUtil;
@@ -63,10 +57,10 @@ public class Repartitioner {
public static QueryUnit [] createJoinTasks(SubQuery subQuery)
throws IOException {
-
+ ExecutionBlock execBlock = subQuery.getBlock();
CatalogService catalog = subQuery.queryContext.getCatalog();
- ScanNode[] scans = subQuery.getScanNodes();
+ ScanNode[] scans = execBlock.getScanNodes();
Path tablePath;
Fragment [] fragments = new Fragment[2];
TableStat [] stats = new TableStat[2];
@@ -91,7 +85,7 @@ public class Repartitioner {
}
// Getting a table stat for each scan
- stats[i] = subQuery.getChildMaps().get(scans[i]).getStats();
+ stats[i] = subQuery.getChildQuery(scans[i]).getStats();
}
// Assigning either fragments or fetch urls to query units
@@ -100,7 +94,7 @@ public class Repartitioner {
tasks = new QueryUnit[1];
tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
false, subQuery.eventHandler);
- tasks[0].setLogicalPlan(subQuery.getLogicalPlan());
+ tasks[0].setLogicalPlan(execBlock.getPlan());
tasks[0].setFragment(scans[0].getTableId(), fragments[0]);
tasks[0].setFragment(scans[1].getTableId(), fragments[1]);
} else {
@@ -182,12 +176,13 @@ public class Repartitioner {
}
private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
+ ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit [] tasks = new QueryUnit[taskNum];
for (int i = 0; i < taskNum; i++) {
tasks[i] = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId(), i), subQuery.isLeafQuery(),
+ QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
subQuery.eventHandler);
- tasks[i].setLogicalPlan(subQuery.getLogicalPlan());
+ tasks[i].setLogicalPlan(execBlock.getPlan());
for (Fragment fragment : fragments) {
tasks[i].setFragment2(fragment);
}
@@ -199,7 +194,7 @@ public class Repartitioner {
private static void addJoinPartition(QueryUnit task, SubQuery subQuery, int partitionId,
Map<String, List<IntermediateEntry>> grouppedPartitions) {
- for (ScanNode scanNode : subQuery.getScanNodes()) {
+ for (ScanNode scanNode : subQuery.getBlock().getScanNodes()) {
Map<String, List<IntermediateEntry>> requests;
if (grouppedPartitions.containsKey(scanNode.getTableId())) {
requests = mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
@@ -210,7 +205,7 @@ public class Repartitioner {
for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
subQuery.getChildQuery(scanNode).getId(),
- partitionId, PARTITION_TYPE.HASH,
+ partitionId, PartitionType.HASH,
requestPerNode.getValue());
fetchURIs.addAll(uris);
}
@@ -218,37 +213,6 @@ public class Repartitioner {
}
}
- private static QueryUnit newJoinTask(SubQuery subQuery, int partitionId,
- Fragment [] fragments,
- Map<String, List<IntermediateEntry>> grouppedPartitions) {
-
- QueryUnit task = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId()), subQuery.isLeafQuery(),
- subQuery.eventHandler);
- task.setLogicalPlan(subQuery.getLogicalPlan());
-
- Map<String, Set<URI>> fetchURIsForEachRel = new HashMap<String, Set<URI>>();
- int i = 0;
- for (ScanNode scanNode : subQuery.getScanNodes()) {
- Map<String, List<IntermediateEntry>> mergedHashPartitionRequest =
- mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
- Set<URI> fetchURIs = TUtil.newHashSet();
- for (Entry<String, List<IntermediateEntry>> requestPerNode
- : mergedHashPartitionRequest.entrySet()) {
- Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
- subQuery.getChildQuery(scanNode).getId(),
- partitionId, PARTITION_TYPE.HASH,
- requestPerNode.getValue());
- fetchURIs.addAll(uris);
- }
- fetchURIsForEachRel.put(scanNode.getTableId(), fetchURIs);
- task.setFragment2(fragments[i++]);
- }
-
- task.setFetches(fetchURIsForEachRel);
- return task;
- }
-
/**
* This method merges the partition request associated with the pullserver's address.
* It reduces the number of TCP connections.
@@ -274,9 +238,10 @@ public class Repartitioner {
SubQuery childSubQuery,
int maxNum)
throws InternalException {
- if (childSubQuery.getOutputType() == PARTITION_TYPE.HASH) {
+ ExecutionBlock childExecBlock = childSubQuery.getBlock();
+ if (childExecBlock.getPartitionType() == PartitionType.HASH) {
return createHashPartitionedTasks(subQuery, childSubQuery, maxNum);
- } else if (childSubQuery.getOutputType() == PARTITION_TYPE.RANGE) {
+ } else if (childExecBlock.getPartitionType() == PartitionType.RANGE) {
return createRangePartitionedTasks(subQuery, childSubQuery, maxNum);
} else {
throw new InternalException("Cannot support partition type");
@@ -287,17 +252,17 @@ public class Repartitioner {
SubQuery childSubQuery,
int maxNum)
throws InternalException {
-
+ ExecutionBlock execBlock = subQuery.getBlock();
TableStat stat = childSubQuery.getStats();
if (stat.getNumRows() == 0) {
return new QueryUnit[0];
}
- ScanNode scan = subQuery.getScanNodes()[0];
+ ScanNode scan = execBlock.getScanNodes()[0];
Path tablePath;
tablePath = subQuery.sm.getTablePath(scan.getTableId());
- StoreTableNode store = (StoreTableNode) childSubQuery.getLogicalPlan();
+ StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan();
SortNode sort = (SortNode) store.getSubNode();
SortSpec[] sortSpecs = sort.getSortKeys();
Schema sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
@@ -331,7 +296,9 @@ public class Repartitioner {
List<String> basicFetchURIs = new ArrayList<String>();
- for (QueryUnit qu : subQuery.getChildQuery(scan).getQueryUnits()) {
+ SubQuery child = childSubQuery.queryContext.getSubQuery(
+ subQuery.getBlock().getChildBlock(scan).getId());
+ for (QueryUnit qu : child.getQueryUnits()) {
for (IntermediateEntry p : qu.getIntermediateData()) {
String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(),
childSubQuery.getId(), p.taskId, p.attemptId);
@@ -404,13 +371,13 @@ public class Repartitioner {
public static QueryUnit [] createHashPartitionedTasks(SubQuery subQuery,
SubQuery childSubQuery,
int maxNum) {
-
+ ExecutionBlock execBlock = subQuery.getBlock();
TableStat stat = childSubQuery.getStats();
if (stat.getNumRows() == 0) {
return new QueryUnit[0];
}
- ScanNode scan = subQuery.getScanNodes()[0];
+ ScanNode scan = execBlock.getScanNodes()[0];
Path tablePath;
tablePath = subQuery.sm.getTablePath(scan.getTableId());
@@ -434,7 +401,7 @@ public class Repartitioner {
for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
Collection<URI> uris = createHashFetchURL(e.getKey(), childSubQuery.getId(),
interm.getKey(),
- childSubQuery.getStoreTableNode().getPartitionType(), e.getValue());
+ childSubQuery.getBlock().getPartitionType(), e.getValue());
if (finalFetchURI.containsKey(interm.getKey())) {
finalFetchURI.get(interm.getKey()).addAll(uris);
@@ -444,7 +411,7 @@ public class Repartitioner {
}
}
- GroupbyNode groupby = (GroupbyNode) childSubQuery.getStoreTableNode().
+ GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getStoreTableNode().
getSubNode();
// the number of tasks cannot exceed the number of merged fetch uris.
int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
@@ -471,7 +438,7 @@ public class Repartitioner {
}
public static Collection<URI> createHashFetchURL(String hostAndPort, SubQueryId childSid,
- int partitionId, PARTITION_TYPE type,
+ int partitionId, PartitionType type,
List<IntermediateEntry> entries) {
String scheme = "http://";
StringBuilder urlPrefix = new StringBuilder(scheme);
@@ -479,9 +446,9 @@ public class Repartitioner {
.append("/?").append("sid=").append(childSid.getId())
.append("&").append("p=").append(partitionId)
.append("&").append("type=");
- if (type == PARTITION_TYPE.HASH) {
+ if (type == PartitionType.HASH) {
urlPrefix.append("h");
- } else if (type == PARTITION_TYPE.RANGE) {
+ } else if (type == PartitionType.RANGE) {
urlPrefix.append("r");
}
urlPrefix.append("&ta=");
@@ -542,12 +509,13 @@ public class Repartitioner {
public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
Fragment frag) {
+ LogicalNode plan = subQuery.getBlock().getPlan();
QueryUnit [] tasks = new QueryUnit[num];
for (int i = 0; i < num; i++) {
tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
false, subQuery.eventHandler);
tasks[i].setFragment2(frag);
- tasks[i].setLogicalPlan(subQuery.getLogicalPlan());
+ tasks[i].setLogicalPlan(plan);
}
return tasks;
}
@@ -568,4 +536,47 @@ public class Repartitioner {
return hashed;
}
+
+ public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ Column[] keys = null;
+ // if the next query is join,
+ // set the partition number for the current logicalUnit
+ // TODO: the union handling is required when a join has unions as its child
+ ExecutionBlock parentBlock = execBlock.getParentBlock();
+ if (parentBlock != null) {
+ if (parentBlock.getStoreTableNode().getSubNode().getType() == ExprType.JOIN) {
+ execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
+ execBlock.getStoreTableNode().getPartitionKeys(), n);
+ keys = execBlock.getStoreTableNode().getPartitionKeys();
+ }
+ }
+
+ StoreTableNode store = execBlock.getStoreTableNode();
+ // set the partition number for group by and sort
+ if (execBlock.getPartitionType() == PartitionType.HASH) {
+ if (store.getSubNode().getType() == ExprType.GROUP_BY) {
+ GroupbyNode groupby = (GroupbyNode)store.getSubNode();
+ keys = groupby.getGroupingColumns();
+ }
+ } else if (execBlock.getPartitionType() == PartitionType.RANGE) {
+ if (store.getSubNode().getType() == ExprType.SORT) {
+ SortNode sort = (SortNode)store.getSubNode();
+ keys = new Column[sort.getSortKeys().length];
+ for (int i = 0; i < keys.length; i++) {
+ keys[i] = sort.getSortKeys()[i].getSortKey();
+ }
+ }
+ }
+ if (keys != null) {
+ if (keys.length == 0) {
+ store.setPartitions(execBlock.getPartitionType(), new Column[]{}, 1);
+ } else {
+ store.setPartitions(execBlock.getPartitionType(), keys, n);
+ }
+ } else {
+ store.setListPartition();
+ }
+ return subQuery;
+ }
}