You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/22 08:50:45 UTC
[3/4] tajo git commit: TAJO-1262: Rename the prefix 'SubQuery' to
'Stage'.
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index cf6b917..4cf6ce2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -81,11 +81,11 @@ public class Repartitioner {
private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
private final static String UNKNOWN_HOST = "unknown";
- public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
+ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage)
throws IOException {
- MasterPlan masterPlan = subQuery.getMasterPlan();
- ExecutionBlock execBlock = subQuery.getBlock();
- QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
+ MasterPlan masterPlan = stage.getMasterPlan();
+ ExecutionBlock execBlock = stage.getBlock();
+ QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext();
ScanNode[] scans = execBlock.getScanNodes();
@@ -98,17 +98,17 @@ public class Repartitioner {
TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
if (tableDesc == null) { // if it is a real table stored on storage
FileStorageManager storageManager =
- (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+ (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
tablePath = storageManager.getTablePath(scans[i].getTableName());
if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) {
for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) {
ExecutionBlockId originScanEbId = unionScanEntry.getKey();
- stats[i] += masterContext.getSubQuery(originScanEbId).getResultStats().getNumBytes();
+ stats[i] += masterContext.getStage(originScanEbId).getResultStats().getNumBytes();
}
} else {
ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName());
- stats[i] = masterContext.getSubQuery(scanEBId).getResultStats().getNumBytes();
+ stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes();
}
fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
} else {
@@ -119,7 +119,7 @@ public class Repartitioner {
}
StorageManager storageManager =
- StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType());
+ StorageManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType());
// if table has no data, storageManager will return empty FileFragment.
// So, we need to handle FileFragment by its size.
@@ -223,7 +223,7 @@ public class Repartitioner {
execBlock.removeBroadcastTable(scans[baseScanIdx].getCanonicalName());
LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join with all tables, base_table=%s, base_volume=%d",
scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
- scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments);
+ scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments);
} else if (!execBlock.getBroadcastTables().isEmpty()) { // If some relations of this EB are broadcasted
boolean hasNonLeafNode = false;
List<Integer> largeScanIndexList = new ArrayList<Integer>();
@@ -266,7 +266,7 @@ public class Repartitioner {
int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0);
LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d",
scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
- scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments);
+ scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments);
} else {
if (largeScanIndexList.size() > 2) {
throw new IOException("Symmetric Repartition Join should have two scan node, but " + nonLeafScanNames);
@@ -292,12 +292,12 @@ public class Repartitioner {
index++;
}
LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, join_node=%s", nonLeafScanNames));
- scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery,
+ scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage,
intermediateScans, intermediateScanStats, intermediateFragments, broadcastScans, broadcastFragments);
}
} else {
LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join");
- scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery, scans, stats, fragments, null, null);
+ scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, scans, stats, fragments, null, null);
}
}
@@ -305,7 +305,7 @@ public class Repartitioner {
* Scheduling in tech case of Symmetric Repartition Join
* @param masterContext
* @param schedulerContext
- * @param subQuery
+ * @param stage
* @param scans
* @param stats
* @param fragments
@@ -313,21 +313,21 @@ public class Repartitioner {
*/
private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMasterTaskContext masterContext,
TaskSchedulerContext schedulerContext,
- SubQuery subQuery,
+ Stage stage,
ScanNode[] scans,
long[] stats,
Fragment[] fragments,
ScanNode[] broadcastScans,
Fragment[] broadcastFragments) throws IOException {
- MasterPlan masterPlan = subQuery.getMasterPlan();
- ExecutionBlock execBlock = subQuery.getBlock();
+ MasterPlan masterPlan = stage.getMasterPlan();
+ ExecutionBlock execBlock = stage.getBlock();
// The hash map is modeling as follows:
// <Part Id, <EbId, List<Intermediate Data>>>
Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries =
new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
// Grouping IntermediateData by a partition key and a table name
- List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
+ List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
// In the case of join with union, there is one ScanNode for union.
Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = execBlock.getUnionScanMap();
@@ -336,7 +336,7 @@ public class Repartitioner {
if (scanEbId == null) {
scanEbId = childBlock.getId();
}
- SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
+ Stage childExecSM = stage.getContext().getStage(childBlock.getId());
if (childExecSM.getHashShuffleIntermediateEntries() != null &&
!childExecSM.getHashShuffleIntermediateEntries().isEmpty()) {
@@ -387,7 +387,7 @@ public class Repartitioner {
// Getting the desire number of join tasks according to the volumn
// of a larger table
int largerIdx = stats[0] >= stats[1] ? 0 : 1;
- int desireJoinTaskVolumn = subQuery.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE);
+ int desireJoinTaskVolumn = stage.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE);
// calculate the number of tasks according to the data size
int mb = (int) Math.ceil((double) stats[largerIdx] / 1048576);
@@ -412,7 +412,7 @@ public class Repartitioner {
TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
FileStorageManager storageManager =
- (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+ (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
partitionScanPaths = partitionScan.getInputPaths();
@@ -420,7 +420,7 @@ public class Repartitioner {
getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc);
partitionScan.setInputPaths(partitionScanPaths);
} else {
- StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(),
+ StorageManager storageManager = StorageManager.getStorageManager(stage.getContext().getConf(),
tableDesc.getMeta().getStoreType());
Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(),
tableDesc, eachScan);
@@ -430,12 +430,12 @@ public class Repartitioner {
}
}
}
- SubQuery.scheduleFragment(subQuery, fragments[0], rightFragments);
+ Stage.scheduleFragment(stage, fragments[0], rightFragments);
// Assign partitions to tasks in a round robin manner.
for (Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry
: hashEntries.entrySet()) {
- addJoinShuffle(subQuery, entry.getKey(), entry.getValue());
+ addJoinShuffle(stage, entry.getKey(), entry.getValue());
}
schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum));
@@ -503,9 +503,9 @@ public class Repartitioner {
return fragments;
}
- private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
+ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage,
int baseScanId, Fragment[] fragments) throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
+ ExecutionBlock execBlock = stage.getBlock();
ScanNode[] scans = execBlock.getScanNodes();
for (int i = 0; i < scans.length; i++) {
@@ -527,7 +527,7 @@ public class Repartitioner {
List<Fragment> broadcastFragments = new ArrayList<Fragment>();
for (int i = 0; i < scans.length; i++) {
ScanNode scan = scans[i];
- TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
+ TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName());
TableMeta meta = desc.getMeta();
Collection<Fragment> scanFragments;
@@ -537,11 +537,11 @@ public class Repartitioner {
partitionScanPaths = partitionScan.getInputPaths();
// set null to inputPaths in getFragmentsFromPartitionedTable()
FileStorageManager storageManager =
- (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+ (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc);
} else {
StorageManager storageManager =
- StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType());
+ StorageManager.getStorageManager(stage.getContext().getConf(), desc.getMeta().getStoreType());
scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan);
}
@@ -565,14 +565,14 @@ public class Repartitioner {
throw new IOException("No fragments for " + scans[baseScanId].getTableName());
}
- SubQuery.scheduleFragments(subQuery, baseFragments, broadcastFragments);
+ Stage.scheduleFragments(stage, baseFragments, broadcastFragments);
schedulerContext.setEstimatedTaskNum(baseFragments.size());
}
- private static void addJoinShuffle(SubQuery subQuery, int partitionId,
+ private static void addJoinShuffle(Stage stage, int partitionId,
Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) {
Map<String, List<FetchImpl>> fetches = new HashMap<String, List<FetchImpl>>();
- for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) {
+ for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) {
if (grouppedPartitions.containsKey(execBlock.getId())) {
Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE,
grouppedPartitions.get(execBlock.getId()));
@@ -581,10 +581,10 @@ public class Repartitioner {
}
if (fetches.isEmpty()) {
- LOG.info(subQuery.getId() + "'s " + partitionId + " partition has empty result.");
+ LOG.info(stage.getId() + "'s " + partitionId + " partition has empty result.");
return;
}
- SubQuery.scheduleFetches(subQuery, fetches);
+ Stage.scheduleFetches(stage, fetches);
}
/**
@@ -616,14 +616,14 @@ public class Repartitioner {
}
public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
- MasterPlan masterPlan, SubQuery subQuery, int maxNum)
+ MasterPlan masterPlan, Stage stage, int maxNum)
throws IOException {
- DataChannel channel = masterPlan.getIncomingChannels(subQuery.getBlock().getId()).get(0);
+ DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0);
if (channel.getShuffleType() == HASH_SHUFFLE
|| channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
- scheduleHashShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
+ scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
} else if (channel.getShuffleType() == RANGE_SHUFFLE) {
- scheduleRangeShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
+ scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
} else {
throw new InternalException("Cannot support partition type");
}
@@ -634,22 +634,22 @@ public class Repartitioner {
List<TableStats> tableStatses = new ArrayList<TableStats>();
List<ExecutionBlock> childBlocks = masterPlan.getChilds(parentBlockId);
for (ExecutionBlock childBlock : childBlocks) {
- SubQuery childExecSM = context.getSubQuery(childBlock.getId());
- tableStatses.add(childExecSM.getResultStats());
+ Stage childStage = context.getStage(childBlock.getId());
+ tableStatses.add(childStage.getResultStats());
}
return StatisticsUtil.aggregateTableStat(tableStatses);
}
public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
- SubQuery subQuery, DataChannel channel, int maxNum)
+ Stage stage, DataChannel channel, int maxNum)
throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
+ ExecutionBlock execBlock = stage.getBlock();
ScanNode scan = execBlock.getScanNodes()[0];
Path tablePath;
- tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+ tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()))
.getTablePath(scan.getTableName());
- ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0);
+ ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0);
SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
SortSpec [] sortSpecs = sortNode.getSortKeys();
Schema sortSchema = new Schema(channel.getShuffleKeys());
@@ -658,7 +658,7 @@ public class Repartitioner {
int determinedTaskNum;
// calculate the number of maximum query ranges
- TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
+ TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId());
// If there is an empty table in inner join, it should return zero rows.
if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) {
@@ -668,15 +668,15 @@ public class Repartitioner {
if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) {
StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
- CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+ CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot();
TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
if (tableDesc == null) {
throw new IOException("Can't get table meta data from catalog: " +
PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
}
- ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType)
- .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc,
+ ranges = StorageManager.getStorageManager(stage.getContext().getConf(), storeType)
+ .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc,
sortNode.getInSchema(), sortSpecs,
mergedRange);
determinedTaskNum = ranges.length;
@@ -687,36 +687,36 @@ public class Repartitioner {
// if the number of the range cardinality is less than the desired number of tasks,
// we set the the number of tasks to the number of range cardinality.
if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
- LOG.info(subQuery.getId() + ", The range cardinality (" + card
+ LOG.info(stage.getId() + ", The range cardinality (" + card
+ ") is less then the desired number of tasks (" + maxNum + ")");
determinedTaskNum = card.intValue();
} else {
determinedTaskNum = maxNum;
}
- LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
+ LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
" sub ranges (total units: " + determinedTaskNum + ")");
ranges = partitioner.partition(determinedTaskNum);
if (ranges == null || ranges.length == 0) {
- LOG.warn(subQuery.getId() + " no range infos.");
+ LOG.warn(stage.getId() + " no range infos.");
}
TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
if (LOG.isDebugEnabled()) {
if (ranges != null) {
for (TupleRange eachRange : ranges) {
- LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+ LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
}
}
}
}
FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
- SubQuery.scheduleFragment(subQuery, dummyFragment);
+ Stage.scheduleFragment(stage, dummyFragment);
List<FetchImpl> fetches = new ArrayList<FetchImpl>();
- List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
+ List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
for (ExecutionBlock childBlock : childBlocks) {
- SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
+ Stage childExecSM = stage.getContext().getStage(childBlock.getId());
for (Task qu : childExecSM.getTasks()) {
for (IntermediateEntry p : qu.getIntermediateData()) {
FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
@@ -758,12 +758,12 @@ public class Repartitioner {
LOG.error(e);
}
- scheduleFetchesByRoundRobin(subQuery, map, scan.getTableName(), determinedTaskNum);
+ scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum);
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
}
- public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collection<FetchImpl>> partitions,
+ public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions,
String tableName, int num) {
int i;
Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
@@ -777,7 +777,7 @@ public class Repartitioner {
if (i == num) i = 0;
}
for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
- SubQuery.scheduleFetches(subQuery, eachFetches);
+ Stage.scheduleFetches(stage, eachFetches);
}
}
@@ -807,18 +807,18 @@ public class Repartitioner {
}
public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
- SubQuery subQuery, DataChannel channel,
+ Stage stage, DataChannel channel,
int maxNum) throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
+ ExecutionBlock execBlock = stage.getBlock();
ScanNode scan = execBlock.getScanNodes()[0];
Path tablePath;
- tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+ tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()))
.getTablePath(scan.getTableName());
Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
List<Fragment> fragments = new ArrayList<Fragment>();
fragments.add(frag);
- SubQuery.scheduleFragments(subQuery, fragments);
+ Stage.scheduleFragments(stage, fragments);
Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer, FetchGroupMeta>();
Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId,
@@ -826,7 +826,7 @@ public class Repartitioner {
for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
- partitions.addAll(subQuery.getContext().getSubQuery(block.getId()).getHashShuffleIntermediateEntries());
+ partitions.addAll(stage.getContext().getStage(block.getId()).getHashShuffleIntermediateEntries());
// In scattered hash shuffle, Collecting each IntermediateEntry
if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
@@ -861,16 +861,16 @@ public class Repartitioner {
}
int groupingColumns = 0;
- LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(),
+ LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(stage.getBlock().getPlan(),
new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY});
if (groupbyNodes != null && groupbyNodes.length > 0) {
LogicalNode bottomNode = groupbyNodes[0];
if (bottomNode.getType() == NodeType.GROUP_BY) {
groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length;
} else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) {
- DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+ DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
if (distinctNode == null) {
- LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
+ LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
distinctNode = (DistinctGroupbyNode)bottomNode;
}
groupingColumns = distinctNode.getGroupingColumns().length;
@@ -879,8 +879,8 @@ public class Repartitioner {
EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
if (property != null) {
if (property.getDistinct().getIsMultipleAggregation()) {
- MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
- if (stage != MultipleAggregationStage.THRID_STAGE) {
+ MultipleAggregationStage mulAggStage = property.getDistinct().getMultipleAggregationStage();
+ if (mulAggStage != MultipleAggregationStage.THRID_STAGE) {
groupingColumns = distinctNode.getOutSchema().size();
}
}
@@ -889,13 +889,13 @@ public class Repartitioner {
}
// get a proper number of tasks
int determinedTaskNum = Math.min(maxNum, finalFetches.size());
- LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
+ LOG.info(stage.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size());
if (groupingColumns == 0) {
determinedTaskNum = 1;
- LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
+ LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
} else {
- TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
+ TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId());
if (totalStat.getNumRows() == 0) {
determinedTaskNum = 1;
}
@@ -903,13 +903,13 @@ public class Repartitioner {
// set the proper number of tasks to the estimated task num
if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
- scheduleScatteredHashShuffleFetches(schedulerContext, subQuery, intermediates,
+ scheduleScatteredHashShuffleFetches(schedulerContext, stage, intermediates,
scan.getTableName());
} else {
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
// divide fetch uris into the the proper number of tasks according to volumes
- scheduleFetchesByEvenDistributedVolumes(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
- LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
+ scheduleFetchesByEvenDistributedVolumes(stage, finalFetches, scan.getTableName(), determinedTaskNum);
+ LOG.info(stage.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
}
}
@@ -970,12 +970,12 @@ public class Repartitioner {
return new Pair<Long[], Map<String, List<FetchImpl>>[]>(assignedVolumes, fetchesArray);
}
- public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Map<Integer, FetchGroupMeta> partitions,
+ public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions,
String tableName, int num) {
Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
// Schedule FetchImpls
for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
- SubQuery.scheduleFetches(subQuery, eachFetches);
+ Stage.scheduleFetches(stage, eachFetches);
}
}
@@ -987,12 +987,12 @@ public class Repartitioner {
// to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit.
// It is usually used for writing partitioned tables.
public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext,
- SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
+ Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
String tableName) {
long splitVolume = StorageUnit.MB *
- subQuery.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
+ stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
long pageSize = StorageUnit.MB *
- subQuery.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
+ stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
if (pageSize >= splitVolume) {
throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
"tajo.shuffle.hash.appender.page.volumn-mb");
@@ -1033,11 +1033,11 @@ public class Repartitioner {
fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
fetchesArray[i].put(tableName, entry);
- SubQuery.scheduleFetches(subQuery, fetchesArray[i]);
+ Stage.scheduleFetches(stage, fetchesArray[i]);
i++;
}
- LOG.info(subQuery.getId()
+ LOG.info(stage.getId()
+ ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()
+ ", Intermediate Size: " + totalIntermediateSize
+ ", splitSize: " + splitVolume
@@ -1207,16 +1207,16 @@ public class Repartitioner {
return hashed;
}
- public static SubQuery setShuffleOutputNumForTwoPhase(SubQuery subQuery, final int desiredNum, DataChannel channel) {
- ExecutionBlock execBlock = subQuery.getBlock();
+ public static Stage setShuffleOutputNumForTwoPhase(Stage stage, final int desiredNum, DataChannel channel) {
+ ExecutionBlock execBlock = stage.getBlock();
Column[] keys;
// if the next query is join,
// set the partition number for the current logicalUnit
// TODO: the union handling is required when a join has unions as its child
- MasterPlan masterPlan = subQuery.getMasterPlan();
+ MasterPlan masterPlan = stage.getMasterPlan();
keys = channel.getShuffleKeys();
- if (!masterPlan.isRoot(subQuery.getBlock()) ) {
- ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
+ if (!masterPlan.isRoot(stage.getBlock()) ) {
+ ExecutionBlock parentBlock = masterPlan.getParent(stage.getBlock());
if (parentBlock.getPlan().getType() == NodeType.JOIN) {
channel.setShuffleOutputNum(desiredNum);
}
@@ -1246,6 +1246,6 @@ public class Repartitioner {
channel.setShuffleOutputNum(desiredNum);
}
}
- return subQuery;
+ return stage;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
new file mode 100644
index 0000000..e421417
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
@@ -0,0 +1,1342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
+import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
+import org.apache.tajo.master.*;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.storage.FileStorageManager;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.history.TaskHistory;
+import org.apache.tajo.util.history.StageHistory;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+
+
+/**
+ * Stage plays a role in controlling an ExecutionBlock and is a finite state machine.
+ */
+public class Stage implements EventHandler<StageEvent> {
+
+ private static final Log LOG = LogFactory.getLog(Stage.class);
+
+ private MasterPlan masterPlan;
+ private ExecutionBlock block;
+ private int priority;
+ private Schema schema;
+ private TableMeta meta;
+ private TableStats resultStatistics;
+ private TableStats inputStatistics;
+ private EventHandler<Event> eventHandler;
+ private AbstractTaskScheduler taskScheduler;
+ private QueryMasterTask.QueryMasterTaskContext context;
+ private final List<String> diagnostics = new ArrayList<String>();
+ private StageState stageState;
+
+ private long startTime;
+ private long finishTime;
+
+ volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
+ volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
+ TajoContainer>();
+
+ private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
+ private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
+ private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
+ new AllocatedContainersCancelTransition();
+ private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
+ private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
+
+ protected static final StateMachineFactory<Stage, StageState,
+ StageEventType, StageEvent> stateMachineFactory =
+ new StateMachineFactory <Stage, StageState,
+ StageEventType, StageEvent> (StageState.NEW)
+
+ // Transitions from NEW state
+ .addTransition(StageState.NEW,
+ EnumSet.of(StageState.INITED, StageState.ERROR, StageState.SUCCEEDED),
+ StageEventType.SQ_INIT,
+ new InitAndRequestContainer())
+ .addTransition(StageState.NEW, StageState.NEW,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.NEW, StageState.KILLED,
+ StageEventType.SQ_KILL)
+ .addTransition(StageState.NEW, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from INITED state
+ .addTransition(StageState.INITED, StageState.RUNNING,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINER_LAUNCH_TRANSITION)
+ .addTransition(StageState.INITED, StageState.INITED,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.INITED, StageState.KILL_WAIT,
+ StageEventType.SQ_KILL, new KillTasksTransition())
+ .addTransition(StageState.INITED, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from RUNNING state
+ .addTransition(StageState.RUNNING, StageState.RUNNING,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINER_LAUNCH_TRANSITION)
+ .addTransition(StageState.RUNNING, StageState.RUNNING,
+ StageEventType.SQ_TASK_COMPLETED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(StageState.RUNNING,
+ EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+ StageEventType.SQ_STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(StageState.RUNNING, StageState.RUNNING,
+ StageEventType.SQ_FAILED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(StageState.RUNNING, StageState.RUNNING,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.RUNNING, StageState.KILL_WAIT,
+ StageEventType.SQ_KILL,
+ new KillTasksTransition())
+ .addTransition(StageState.RUNNING, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able Transition
+ .addTransition(StageState.RUNNING, StageState.RUNNING,
+ StageEventType.SQ_START)
+
+ // Transitions from KILL_WAIT state
+ .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+ EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition())
+ .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+ StageEventType.SQ_TASK_COMPLETED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(StageState.KILL_WAIT,
+ EnumSet.of(StageState.SUCCEEDED, StageState.FAILED, StageState.KILLED),
+ StageEventType.SQ_STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+ StageEventType.SQ_FAILED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(StageState.KILL_WAIT, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from SUCCEEDED state
+ .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.SUCCEEDED, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able events
+ .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
+ EnumSet.of(
+ StageEventType.SQ_START,
+ StageEventType.SQ_KILL,
+ StageEventType.SQ_CONTAINER_ALLOCATED))
+
+ // Transitions from KILLED state
+ .addTransition(StageState.KILLED, StageState.KILLED,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(StageState.KILLED, StageState.KILLED,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.KILLED, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(StageState.KILLED, StageState.KILLED,
+ EnumSet.of(
+ StageEventType.SQ_START,
+ StageEventType.SQ_KILL,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ StageEventType.SQ_FAILED))
+
+ // Transitions from FAILED state
+ .addTransition(StageState.FAILED, StageState.FAILED,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(StageState.FAILED, StageState.FAILED,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(StageState.FAILED, StageState.ERROR,
+ StageEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(StageState.FAILED, StageState.FAILED,
+ EnumSet.of(
+ StageEventType.SQ_START,
+ StageEventType.SQ_KILL,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ StageEventType.SQ_FAILED))
+
+ // Transitions from ERROR state
+ .addTransition(StageState.ERROR, StageState.ERROR,
+ StageEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(StageState.ERROR, StageState.ERROR,
+ StageEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(StageState.ERROR, StageState.ERROR,
+ EnumSet.of(
+ StageEventType.SQ_START,
+ StageEventType.SQ_KILL,
+ StageEventType.SQ_FAILED,
+ StageEventType.SQ_INTERNAL_ERROR,
+ StageEventType.SQ_STAGE_COMPLETED))
+
+ .installTopology();
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ private int totalScheduledObjectsCount;
+ private int succeededObjectCount = 0;
+ private int completedTaskCount = 0;
+ private int succeededTaskCount = 0;
+ private int killedObjectCount = 0;
+ private int failedObjectCount = 0;
+ private TaskSchedulerContext schedulerContext;
+ private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
+ private AtomicInteger completeReportReceived = new AtomicInteger(0);
+ private StageHistory finalStageHistory;
+
+ public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
+ this.context = context;
+ this.masterPlan = masterPlan;
+ this.block = block;
+ this.eventHandler = context.getEventHandler();
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+ stateMachine = stateMachineFactory.make(this);
+ stageState = stateMachine.getCurrentState();
+ }
+
+ public static boolean isRunningState(StageState state) {
+ return state == StageState.INITED || state == StageState.NEW || state == StageState.RUNNING;
+ }
+
+ public QueryMasterTask.QueryMasterTaskContext getContext() {
+ return context;
+ }
+
+ public MasterPlan getMasterPlan() {
+ return masterPlan;
+ }
+
+ public DataChannel getDataChannel() {
+ return masterPlan.getOutgoingChannels(getId()).iterator().next();
+ }
+
+ public EventHandler<Event> getEventHandler() {
+ return eventHandler;
+ }
+
+ public AbstractTaskScheduler getTaskScheduler() {
+ return taskScheduler;
+ }
+
+ public void setStartTime() {
+ startTime = context.getClock().getTime();
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setFinishTime() {
+ finishTime = context.getClock().getTime();
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public float getTaskProgress() {
+ readLock.lock();
+ try {
+ if (getState() == StageState.NEW) {
+ return 0;
+ } else {
+ return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public float getProgress() {
+ List<Task> tempTasks = null;
+ readLock.lock();
+ try {
+ if (getState() == StageState.NEW) {
+ return 0.0f;
+ } else {
+ tempTasks = new ArrayList<Task>(tasks.values());
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ float totalProgress = 0.0f;
+ for (Task eachTask : tempTasks) {
+ if (eachTask.getLastAttempt() != null) {
+ totalProgress += eachTask.getLastAttempt().getProgress();
+ }
+ }
+
+ if (totalProgress > 0.0f) {
+ return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f;
+ } else {
+ return 0.0f;
+ }
+ }
+
+ public int getSucceededObjectCount() {
+ return succeededObjectCount;
+ }
+
+ public int getTotalScheduledObjectsCount() {
+ return totalScheduledObjectsCount;
+ }
+
+ public ExecutionBlock getBlock() {
+ return block;
+ }
+
+ public void addTask(Task task) {
+ tasks.put(task.getId(), task);
+ }
+
+ public StageHistory getStageHistory() {
+ if (finalStageHistory != null) {
+ if (finalStageHistory.getFinishTime() == 0) {
+ finalStageHistory = makeStageHistory();
+ finalStageHistory.setTasks(makeTaskHistories());
+ }
+ return finalStageHistory;
+ } else {
+ return makeStageHistory();
+ }
+ }
+
+ private List<TaskHistory> makeTaskHistories() {
+ List<TaskHistory> taskHistories = new ArrayList<TaskHistory>();
+
+ for(Task eachTask : getTasks()) {
+ taskHistories.add(eachTask.getTaskHistory());
+ }
+
+ return taskHistories;
+ }
+
+ private StageHistory makeStageHistory() {
+ StageHistory stageHistory = new StageHistory();
+
+ stageHistory.setExecutionBlockId(getId().toString());
+ stageHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan()));
+ stageHistory.setState(getState().toString());
+ stageHistory.setStartTime(startTime);
+ stageHistory.setFinishTime(finishTime);
+ stageHistory.setSucceededObjectCount(succeededObjectCount);
+ stageHistory.setKilledObjectCount(killedObjectCount);
+ stageHistory.setFailedObjectCount(failedObjectCount);
+ stageHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount);
+ stageHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned());
+ stageHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned());
+
+ long totalInputBytes = 0;
+ long totalReadBytes = 0;
+ long totalReadRows = 0;
+ long totalWriteBytes = 0;
+ long totalWriteRows = 0;
+ int numShuffles = 0;
+ for(Task eachTask : getTasks()) {
+ numShuffles = eachTask.getShuffleOutpuNum();
+ if (eachTask.getLastAttempt() != null) {
+ TableStats inputStats = eachTask.getLastAttempt().getInputStats();
+ if (inputStats != null) {
+ totalInputBytes += inputStats.getNumBytes();
+ totalReadBytes += inputStats.getReadBytes();
+ totalReadRows += inputStats.getNumRows();
+ }
+ TableStats outputStats = eachTask.getLastAttempt().getResultStats();
+ if (outputStats != null) {
+ totalWriteBytes += outputStats.getNumBytes();
+ totalWriteRows += outputStats.getNumRows();
+ }
+ }
+ }
+
+ stageHistory.setTotalInputBytes(totalInputBytes);
+ stageHistory.setTotalReadBytes(totalReadBytes);
+ stageHistory.setTotalReadRows(totalReadRows);
+ stageHistory.setTotalWriteBytes(totalWriteBytes);
+ stageHistory.setTotalWriteRows(totalWriteRows);
+ stageHistory.setNumShuffles(numShuffles);
+ stageHistory.setProgress(getProgress());
+ return stageHistory;
+ }
+
+ /**
+ * It finalizes this stage. It is only invoked when the stage is succeeded.
+ */
+ public void complete() {
+ cleanup();
+ finalizeStats();
+ setFinishTime();
+ eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
+ }
+
+ /**
+ * It finalizes this stage. Unlike {@link Stage#complete()},
+ * it is invoked when a stage is abnormally finished.
+ *
+ * @param finalState The final stage state
+ */
+ public void abort(StageState finalState) {
+ // TODO -
+ // - committer.abortStage(...)
+ // - record Stage Finish Time
+ // - CleanUp Tasks
+ // - Record History
+ cleanup();
+ setFinishTime();
+ eventHandler.handle(new StageCompletedEvent(getId(), finalState));
+ }
+
+ public StateMachine<StageState, StageEventType, StageEvent> getStateMachine() {
+ return this.stateMachine;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+
+ public int getPriority() {
+ return this.priority;
+ }
+
+ public ExecutionBlockId getId() {
+ return block.getId();
+ }
+
+ public Task[] getTasks() {
+ return tasks.values().toArray(new Task[tasks.size()]);
+ }
+
+ public Task getTask(TaskId qid) {
+ return tasks.get(qid);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public TableMeta getTableMeta() {
+ return meta;
+ }
+
+ public TableStats getResultStats() {
+ return resultStatistics;
+ }
+
+ public TableStats getInputStats() {
+ return inputStatistics;
+ }
+
+ public List<String> getDiagnostics() {
+ readLock.lock();
+ try {
+ return diagnostics;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected void addDiagnostic(String diag) {
+ diagnostics.add(diag);
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getId());
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Stage) {
+ Stage other = (Stage)o;
+ return getId().equals(other.getId());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ public int compareTo(Stage other) {
+ return getId().compareTo(other.getId());
+ }
+
+ public StageState getSynchronizedState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /* non-blocking call for client API */
+ public StageState getState() {
+ return stageState;
+ }
+
+ public static TableStats[] computeStatFromUnionBlock(Stage stage) {
+ TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
+ long[] avgRows = new long[]{0, 0};
+ long[] numBytes = new long[]{0, 0};
+ long[] readBytes = new long[]{0, 0};
+ long[] numRows = new long[]{0, 0};
+ int[] numBlocks = new int[]{0, 0};
+ int[] numOutputs = new int[]{0, 0};
+
+ List<ColumnStats> columnStatses = Lists.newArrayList();
+
+ MasterPlan masterPlan = stage.getMasterPlan();
+ Iterator<ExecutionBlock> it = masterPlan.getChilds(stage.getBlock()).iterator();
+ while (it.hasNext()) {
+ ExecutionBlock block = it.next();
+ Stage childStage = stage.context.getStage(block.getId());
+ TableStats[] childStatArray = new TableStats[]{
+ childStage.getInputStats(), childStage.getResultStats()
+ };
+ for (int i = 0; i < 2; i++) {
+ if (childStatArray[i] == null) {
+ continue;
+ }
+ avgRows[i] += childStatArray[i].getAvgRows();
+ numBlocks[i] += childStatArray[i].getNumBlocks();
+ numBytes[i] += childStatArray[i].getNumBytes();
+ readBytes[i] += childStatArray[i].getReadBytes();
+ numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
+ numRows[i] += childStatArray[i].getNumRows();
+ }
+ columnStatses.addAll(childStatArray[1].getColumnStats());
+ }
+
+ for (int i = 0; i < 2; i++) {
+ stat[i].setNumBlocks(numBlocks[i]);
+ stat[i].setNumBytes(numBytes[i]);
+ stat[i].setReadBytes(readBytes[i]);
+ stat[i].setNumShuffleOutputs(numOutputs[i]);
+ stat[i].setNumRows(numRows[i]);
+ stat[i].setAvgRows(avgRows[i]);
+ }
+ stat[1].setColumnStats(columnStatses);
+
+ return stat;
+ }
+
+ private TableStats[] computeStatFromTasks() {
+ List<TableStats> inputStatsList = Lists.newArrayList();
+ List<TableStats> resultStatsList = Lists.newArrayList();
+ for (Task unit : getTasks()) {
+ resultStatsList.add(unit.getStats());
+ if (unit.getLastAttempt().getInputStats() != null) {
+ inputStatsList.add(unit.getLastAttempt().getInputStats());
+ }
+ }
+ TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
+ TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
+ return new TableStats[]{inputStats, resultStats};
+ }
+
+ private void stopScheduler() {
+ // If there are launched TaskRunners, send the 'shouldDie' message to all r
+ // via received task requests.
+ if (taskScheduler != null) {
+ taskScheduler.stop();
+ }
+ }
+
+ private void releaseContainers() {
+ // If there are still live TaskRunners, try to kill the containers.
+ eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
+ }
+
+ /**
+ * It computes all stats and sets the intermediate result.
+ */
+ private void finalizeStats() {
+ TableStats[] statsArray;
+ if (block.hasUnion()) {
+ statsArray = computeStatFromUnionBlock(this);
+ } else {
+ statsArray = computeStatFromTasks();
+ }
+
+ DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
+
+ // if store plan (i.e., CREATE or INSERT OVERWRITE)
+ StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+ if (storeType == null) {
+ // get default or store type
+ storeType = StoreType.CSV;
+ }
+
+ schema = channel.getSchema();
+ meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet());
+ inputStatistics = statsArray[0];
+ resultStatistics = statsArray[1];
+ }
+
+ @Override
+ public void handle(StageEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
+ + getSynchronizedState());
+ }
+
+ try {
+ writeLock.lock();
+ StageState oldState = getSynchronizedState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ stageState = getSynchronizedState();
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state"
+ + ", eventType:" + event.getType().name()
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getSynchronizedState().name()
+ , e);
+ eventHandler.handle(new StageEvent(getId(),
+ StageEventType.SQ_INTERNAL_ERROR));
+ }
+
+ // notify the eventhandler of state change
+ if (LOG.isDebugEnabled()) {
+ if (oldState != getSynchronizedState()) {
+ LOG.debug(getId() + " Stage Transitioned from " + oldState + " to "
+ + getSynchronizedState());
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void handleTaskRequestEvent(TaskRequestEvent event) {
+ taskScheduler.handleTaskRequestEvent(event);
+ }
+
+ private static class InitAndRequestContainer implements MultipleArcTransition<Stage,
+ StageEvent, StageState> {
+
+ @Override
+ public StageState transition(final Stage stage, StageEvent stageEvent) {
+ stage.setStartTime();
+ ExecutionBlock execBlock = stage.getBlock();
+ StageState state;
+
+ try {
+ // Union operator does not require actual query processing. It is performed logically.
+ if (execBlock.hasUnion()) {
+ stage.finalizeStats();
+ state = StageState.SUCCEEDED;
+ } else {
+ // execute pre-processing asyncronously
+ stage.getContext().getQueryMasterContext().getEventExecutor()
+ .submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
+ DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
+ setShuffleIfNecessary(stage, channel);
+ initTaskScheduler(stage);
+ schedule(stage);
+ stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum();
+ LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
+
+ if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+ stage.complete();
+ } else {
+ if(stage.getSynchronizedState() == StageState.INITED) {
+ stage.taskScheduler.start();
+ allocateContainers(stage);
+ } else {
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
+ stage.setFinishTime();
+ stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
+ stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
+ }
+ }
+ }
+ );
+ state = StageState.INITED;
+ }
+ } catch (Throwable e) {
+ LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
+ stage.setFinishTime();
+ stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
+ stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
+ return StageState.ERROR;
+ }
+
+ return state;
+ }
+
+ private void initTaskScheduler(Stage stage) throws IOException {
+ TajoConf conf = stage.context.getConf();
+ stage.schedulerContext = new TaskSchedulerContext(stage.context,
+ stage.getMasterPlan().isLeaf(stage.getId()), stage.getId());
+ stage.taskScheduler = TaskSchedulerFactory.get(conf, stage.schedulerContext, stage);
+ stage.taskScheduler.init(conf);
+ LOG.info(stage.taskScheduler.getName() + " is chosen for the task scheduling for " + stage.getId());
+ }
+
+ /**
+ * If a parent block requires a repartition operation, the method sets proper repartition
+ * methods and the number of partitions to a given Stage.
+ */
+ private static void setShuffleIfNecessary(Stage stage, DataChannel channel) {
+ if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
+ int numTasks = calculateShuffleOutputNum(stage, channel);
+ Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel);
+ }
+ }
+
+ /**
+ * Getting the total memory of cluster
+ *
+ * @param stage
+ * @return mega bytes
+ */
+ private static int getClusterTotalMemory(Stage stage) {
+ List<TajoMasterProtocol.WorkerResourceProto> workers =
+ stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
+
+ int totalMem = 0;
+ for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ totalMem += worker.getMemoryMB();
+ }
+ return totalMem;
+ }
+ /**
+ * Getting the desire number of partitions according to the volume of input data.
+ * This method is only used to determine the partition key number of hash join or aggregation.
+ *
+ * @param stage
+ * @return
+ */
+ public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) {
+ TajoConf conf = stage.context.getConf();
+ MasterPlan masterPlan = stage.getMasterPlan();
+ ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
+
+ LogicalNode grpNode = null;
+ if (parent != null) {
+ grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
+ if (grpNode == null) {
+ grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
+ }
+ }
+
+ // We assume this execution block the first stage of join if two or more tables are included in this block,
+ if (parent != null && parent.getScanNodes().length >= 2) {
+ List<ExecutionBlock> childs = masterPlan.getChilds(parent);
+
+ // for outer
+ ExecutionBlock outer = childs.get(0);
+ long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer);
+
+ // for inner
+ ExecutionBlock inner = childs.get(1);
+ long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner);
+ LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+ + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
+
+ long bigger = Math.max(outerVolume, innerVolume);
+
+ int mb = (int) Math.ceil((double) bigger / 1048576);
+ LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
+
+ int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
+
+ if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
+ taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
+ LOG.warn("!!!!! TESTCASE MODE !!!!!");
+ }
+
+ // The shuffle output numbers of join may be inconsistent by execution block order.
+ // Thus, we need to compare the number with DataChannel output numbers.
+ // If the number is right, the number and DataChannel output numbers will be consistent.
+ int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0;
+ for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
+ outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+ }
+ for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
+ innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+ }
+ if (outerShuffleOutputNum != innerShuffleOutputNum
+ && taskNum != outerShuffleOutputNum
+ && taskNum != innerShuffleOutputNum) {
+ LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" +
+ ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) +
+ ", outerShuffleOutptNum=" + outerShuffleOutputNum +
+ ", innerShuffleOutputNum=" + innerShuffleOutputNum);
+ taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum);
+ }
+
+ LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum);
+
+ return taskNum;
+ // Is this stage the first step of group-by?
+ } else if (grpNode != null) {
+ boolean hasGroupColumns = true;
+ if (grpNode.getType() == NodeType.GROUP_BY) {
+ hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
+ } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+ // Find current distinct stage node.
+ DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
+ if (distinctNode == null) {
+ LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
+ distinctNode = (DistinctGroupbyNode)grpNode;
+ }
+ hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
+
+ Enforcer enforcer = stage.getBlock().getEnforcer();
+ if (enforcer == null) {
+ LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null.");
+ }
+ EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
+ if (property != null) {
+ if (property.getDistinct().getIsMultipleAggregation()) {
+ MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage();
+ if (multiAggStage != MultipleAggregationStage.THRID_STAGE) {
+ hasGroupColumns = true;
+ }
+ }
+ }
+ }
+ if (!hasGroupColumns) {
+ LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
+ return 1;
+ } else {
+ long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
+
+ int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB);
+ LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
+ // determine the number of task
+ int taskNum = (int) Math.ceil((double) volumeByMB /
+ masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
+ LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum);
+ return taskNum;
+ }
+ } else {
+ LOG.info("============>>>>> Unexpected Case! <<<<<================");
+ long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
+
+ int mb = (int) Math.ceil((double)volume / 1048576);
+ LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
+ // determine the number of task per 128MB
+ int taskNum = (int) Math.ceil((double)mb / 128);
+ LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum);
+ return taskNum;
+ }
+ }
+
+ private static void schedule(Stage stage) throws IOException {
+ MasterPlan masterPlan = stage.getMasterPlan();
+ ExecutionBlock execBlock = stage.getBlock();
+ if (stage.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+ scheduleFragmentsForLeafQuery(stage);
+ } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+ Repartitioner.scheduleFragmentsForJoinQuery(stage.schedulerContext, stage);
+ } else { // Case 3: Others (Sort or Aggregation)
+ int numTasks = getNonLeafTaskNum(stage);
+ Repartitioner.scheduleFragmentsForNonLeafTasks(stage.schedulerContext, masterPlan, stage, numTasks);
+ }
+ }
+
+ /**
+ * Getting the desire number of tasks according to the volume of input data
+ *
+ * @param stage
+ * @return
+ */
+ public static int getNonLeafTaskNum(Stage stage) {
+ // Getting intermediate data size
+ long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock());
+
+ int mb = (int) Math.ceil((double)volume / 1048576);
+ LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
+ // determine the number of task per 64MB
+ int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
+ LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
+ return maxTaskNum;
+ }
+
+ public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
+ ExecutionBlock execBlock) {
+ Map<String, TableDesc> tableMap = context.getTableDescMap();
+ if (masterPlan.isLeaf(execBlock)) {
+ ScanNode[] outerScans = execBlock.getScanNodes();
+ long maxVolume = 0;
+ for (ScanNode eachScanNode: outerScans) {
+ TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
+ if (stat.getNumBytes() > maxVolume) {
+ maxVolume = stat.getNumBytes();
+ }
+ }
+ return maxVolume;
+ } else {
+ long aggregatedVolume = 0;
+ for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
+ Stage stage = context.getStage(childBlock.getId());
+ if (stage == null || stage.getSynchronizedState() != StageState.SUCCEEDED) {
+ aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
+ } else {
+ aggregatedVolume += stage.getResultStats().getNumBytes();
+ }
+ }
+
+ return aggregatedVolume;
+ }
+ }
+
+ public static void allocateContainers(Stage stage) {
+ ExecutionBlock execBlock = stage.getBlock();
+
+ //TODO consider disk slot
+ int requiredMemoryMBPerTask = 512;
+
+ int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers(
+ stage.getContext().getQueryMasterContext().getWorkerContext(),
+ stage.schedulerContext.getEstimatedTaskNum(),
+ requiredMemoryMBPerTask
+ );
+
+ final Resource resource = Records.newRecord(Resource.class);
+
+ resource.setMemory(requiredMemoryMBPerTask);
+
+ LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest);
+
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(stage.getPriority());
+ ContainerAllocationEvent event =
+ new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
+ stage.getId(), priority, resource, numRequest,
+ stage.masterPlan.isLeaf(execBlock), 0.0f);
+ stage.eventHandler.handle(event);
+ }
+
+ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException {
+ ExecutionBlock execBlock = stage.getBlock();
+ ScanNode[] scans = execBlock.getScanNodes();
+ Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+ ScanNode scan = scans[0];
+ TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName());
+
+ Collection<Fragment> fragments;
+ TableMeta meta = table.getMeta();
+
+ // Depending on scanner node's type, it creates fragments. If scan is for
+ // a partitioned table, It will creates lots fragments for all partitions.
+ // Otherwise, it creates at least one fragments for a table, which may
+ // span a number of blocks or possibly consists of a number of files.
+ if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+ // After calling this method, partition paths are removed from the physical plan.
+ FileStorageManager storageManager =
+ (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
+ fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
+ } else {
+ StorageManager storageManager =
+ StorageManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType());
+ fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan);
+ }
+
+ Stage.scheduleFragments(stage, fragments);
+ if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) {
+ //Leaf task of DefaultTaskScheduler should be fragment size
+ // EstimatedTaskNum determined number of initial container
+ stage.schedulerContext.setEstimatedTaskNum(fragments.size());
+ } else {
+ TajoConf conf = stage.context.getConf();
+ stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
+ int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
+ (double) stage.schedulerContext.getTaskSize());
+ stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
+ }
+ }
+ }
+
+ public static void scheduleFragment(Stage stage, Fragment fragment) {
+ stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+ stage.getId(), fragment));
+ }
+
+
+ public static void scheduleFragments(Stage stage, Collection<Fragment> fragments) {
+ for (Fragment eachFragment : fragments) {
+ scheduleFragment(stage, eachFragment);
+ }
+ }
+
+ public static void scheduleFragments(Stage stage, Collection<Fragment> leftFragments,
+ Collection<Fragment> broadcastFragments) {
+ for (Fragment eachLeafFragment : leftFragments) {
+ scheduleFragment(stage, eachLeafFragment, broadcastFragments);
+ }
+ }
+
+ public static void scheduleFragment(Stage stage,
+ Fragment leftFragment, Collection<Fragment> rightFragments) {
+ stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+ stage.getId(), leftFragment, rightFragments));
+ }
+
+ public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) {
+ stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+ stage.getId(), fetches));
+ }
+
+ public static Task newEmptyTask(TaskSchedulerContext schedulerContext,
+ TaskAttemptScheduleContext taskContext,
+ Stage stage, int taskId) {
+ ExecutionBlock execBlock = stage.getBlock();
+ Task unit = new Task(schedulerContext.getMasterContext().getConf(),
+ taskContext,
+ QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId),
+ schedulerContext.isLeafQuery(), stage.eventHandler);
+ unit.setLogicalPlan(execBlock.getPlan());
+ stage.addTask(unit);
+ return unit;
+ }
+
+ private static class ContainerLaunchTransition
+ implements SingleArcTransition<Stage, StageEvent> {
+
+ @Override
+ public void transition(Stage stage, StageEvent event) {
+ try {
+ StageContainerAllocationEvent allocationEvent =
+ (StageContainerAllocationEvent) event;
+ for (TajoContainer container : allocationEvent.getAllocatedContainer()) {
+ TajoContainerId cId = container.getId();
+ if (stage.containers.containsKey(cId)) {
+ stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
+ "Duplicated containers are allocated: " + cId.toString()));
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
+ }
+ stage.containers.put(cId, container);
+ }
+ LOG.info("Stage (" + stage.getId() + ") has " + stage.containers.size() + " containers!");
+ stage.eventHandler.handle(
+ new LaunchTaskRunnersEvent(stage.getId(), allocationEvent.getAllocatedContainer(),
+ stage.getContext().getQueryContext(),
+ CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class))
+ );
+
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START));
+ } catch (Throwable t) {
+ stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
+ ExceptionUtils.getStackTrace(t)));
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
+ }
+ }
+ }
+
+ /**
+ * It is used in KILL_WAIT state against Contained Allocated event.
+ * It just returns allocated containers to resource manager.
+ */
+ private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> {
+ @Override
+ public void transition(Stage stage, StageEvent event) {
+ try {
+ StageContainerAllocationEvent allocationEvent =
+ (StageContainerAllocationEvent) event;
+ stage.eventHandler.handle(
+ new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
+ stage.getId(), allocationEvent.getAllocatedContainer()));
+ LOG.info(String.format("[%s] %d allocated containers are canceled",
+ stage.getId().toString(),
+ allocationEvent.getAllocatedContainer().size()));
+ } catch (Throwable t) {
+ stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
+ ExceptionUtils.getStackTrace(t)));
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
+ }
+ }
+ }
+
+ private static class TaskCompletedTransition implements SingleArcTransition<Stage, StageEvent> {
+
+ @Override
+ public void transition(Stage stage,
+ StageEvent event) {
+ StageTaskEvent taskEvent = (StageTaskEvent) event;
+ Task task = stage.getTask(taskEvent.getTaskId());
+
+ if (task == null) { // task failed
+ LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
+ } else {
+ stage.completedTaskCount++;
+
+ if (taskEvent.getState() == TaskState.SUCCEEDED) {
+ stage.succeededObjectCount++;
+ } else if (task.getState() == TaskState.KILLED) {
+ stage.killedObjectCount++;
+ } else if (task.getState() == TaskState.FAILED) {
+ stage.failedObjectCount++;
+ // if at least one task is failed, try to kill all tasks.
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+ }
+
+ LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
+ stage.getId(),
+ stage.getTotalScheduledObjectsCount(),
+ stage.succeededObjectCount,
+ stage.killedObjectCount,
+ stage.failedObjectCount));
+
+ if (stage.totalScheduledObjectsCount ==
+ stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) {
+ stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+ }
+ }
+ }
+ }
+
+ private static class KillTasksTransition implements SingleArcTransition<Stage, StageEvent> {
+
+ @Override
+ public void transition(Stage stage, StageEvent stageEvent) {
+ if(stage.getTaskScheduler() != null){
+ stage.getTaskScheduler().stop();
+ }
+
+ for (Task task : stage.getTasks()) {
+ stage.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL));
+ }
+ }
+ }
+
+ private void cleanup() {
+ stopScheduler();
+ releaseContainers();
+
+ if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
+ List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
+ List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
+
+ for (ExecutionBlock executionBlock : childs) {
+ ebIds.add(executionBlock.getId().getProto());
+ }
+
+ getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
+ }
+
+ this.finalStageHistory = makeStageHistory();
+ this.finalStageHistory.setTasks(makeTaskHistories());
+ }
+
+ public List<IntermediateEntry> getHashShuffleIntermediateEntries() {
+ return hashShuffleIntermediateEntries;
+ }
+
+ protected void waitingIntermediateReport() {
+ LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
+ synchronized(completeReportReceived) {
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ if (completeReportReceived.get() >= tasks.size()) {
+ LOG.info(getId() + ", completed waiting IntermediateReport");
+ return;
+ } else {
+ try {
+ completeReportReceived.wait(10 * 1000);
+ } catch (InterruptedException e) {
+ }
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (elapsedTime >= 120 * 1000) {
+ LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
+ abort(StageState.FAILED);
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
+ LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks());
+ if (!report.getReportSuccess()) {
+ LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
+ abort(StageState.FAILED);
+ return;
+ }
+ if (report.getIntermediateEntriesCount() > 0) {
+ synchronized (hashShuffleIntermediateEntries) {
+ for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
+ hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+ }
+ }
+ }
+ synchronized(completeReportReceived) {
+ completeReportReceived.addAndGet(report.getSucceededTasks());
+ completeReportReceived.notifyAll();
+ }
+ }
+
+ private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {
+
+ @Override
+ public StageState transition(Stage stage, StageEvent stageEvent) {
+ // TODO - Commit Stage
+ // TODO - records succeeded, failed, killed completed task
+ // TODO - records metrics
+ try {
+ LOG.info(String.format("Stage completed - %s (total=%d, success=%d, killed=%d)",
+ stage.getId().toString(),
+ stage.getTotalScheduledObjectsCount(),
+ stage.getSucceededObjectCount(),
+ stage.killedObjectCount));
+
+ if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) {
+ if (stage.failedObjectCount > 0) {
+ stage.abort(StageState.FAILED);
+ return StageState.FAILED;
+ } else if (stage.killedObjectCount > 0) {
+ stage.abort(StageState.KILLED);
+ return StageState.KILLED;
+ } else {
+ LOG.error("Invalid State " + stage.getSynchronizedState() + " State");
+ stage.abort(StageState.ERROR);
+ return StageState.ERROR;
+ }
+ } else {
+ stage.complete();
+ return StageState.SUCCEEDED;
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ stage.abort(StageState.ERROR);
+ return StageState.ERROR;
+ }
+ }
+ }
+
+ private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> {
+ @Override
+ public void transition(Stage stage, StageEvent event) {
+ stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+ }
+ }
+
+ private static class InternalErrorTransition implements SingleArcTransition<Stage, StageEvent> {
+ @Override
+ public void transition(Stage stage, StageEvent stageEvent) {
+ stage.abort(StageState.ERROR);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
new file mode 100644
index 0000000..82a06fe
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+public enum StageState {
+ NEW,
+ INITED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILL_WAIT,
+ KILLED,
+ ERROR
+}