You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/24 13:18:43 UTC
git commit: TAJO-326: In ExecutionBlock,
isRoot() and isLeafBlock() return invalid values. (jihoon)
Updated Branches:
refs/heads/master b37453d00 -> e5b881081
TAJO-326: In ExecutionBlock, isRoot() and isLeafBlock() return invalid values. (jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e5b88108
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e5b88108
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e5b88108
Branch: refs/heads/master
Commit: e5b881081230da251ba86e122a3bac02a91c5639
Parents: b37453d
Author: Jihoon Son <ji...@apache.org>
Authored: Sun Nov 24 21:18:00 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Sun Nov 24 21:18:00 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../engine/planner/global/ExecutionBlock.java | 29 --------------------
.../tajo/engine/planner/global/MasterPlan.java | 8 ++++--
.../apache/tajo/master/TaskSchedulerImpl.java | 4 +--
.../apache/tajo/master/querymaster/Query.java | 12 ++++----
.../tajo/master/querymaster/Repartitioner.java | 4 +--
.../tajo/master/querymaster/SubQuery.java | 4 +--
.../main/java/org/apache/tajo/worker/Task.java | 7 +++--
8 files changed, 24 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2550019..ec2e704 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,8 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-326: In ExecutionBlock, isRoot() and isLeafBlock() return invalid values. (jihoon)
+
TAJO-296: Late registration of Tajo workers. (hyoungjunkim via hyunsik)
TAJO-321: Invalid split file of compressed text file. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 0dc393c..efa1c7f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -21,8 +21,6 @@ import org.apache.tajo.engine.planner.logical.*;
import java.util.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-
/**
* A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
* An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
@@ -35,9 +33,6 @@ public class ExecutionBlock {
private LogicalNode plan = null;
private StoreTableNode store = null;
private List<ScanNode> scanlist = new ArrayList<ScanNode>();
- private ExecutionBlock parent;
- private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
- private PartitionType outputType;
private Enforcer enforcer = new Enforcer();
private boolean hasJoinPlan;
@@ -53,10 +48,6 @@ public class ExecutionBlock {
return executionBlockId;
}
- public PartitionType getPartitionType() {
- return outputType;
- }
-
public void setPlan(LogicalNode plan) {
hasJoinPlan = false;
hasUnionPlan = false;
@@ -98,26 +89,6 @@ public class ExecutionBlock {
return enforcer;
}
- public boolean isRoot() {
- return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
- }
-
- public boolean hasParentBlock() {
- return parent != null;
- }
-
- public ExecutionBlock getParentBlock() {
- return parent;
- }
-
- public Collection<ExecutionBlock> getChildBlocks() {
- return Collections.unmodifiableCollection(childSubQueries.values());
- }
-
- public boolean isLeafBlock() {
- return childSubQueries.size() == 0;
- }
-
public StoreTableNode getStoreTableNode() {
return store;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index b2804cf..6aaeb1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -51,7 +51,7 @@ public class MasterPlan {
}
public boolean isTerminal(ExecutionBlock execBlock) {
- return terminalBlock == execBlock;
+ return terminalBlock.getId().equals(execBlock.getId());
}
public ExecutionBlock getTerminalBlock() {
@@ -144,7 +144,11 @@ public class MasterPlan {
}
public boolean isRoot(ExecutionBlock execBlock) {
- return execBlockGraph.isRoot(execBlock.getId());
+ if (!execBlock.getId().equals(terminalBlock.getId())) {
+ return execBlockGraph.getParent(execBlock.getId()).equals(terminalBlock.getId());
+ } else {
+ return false;
+ }
}
public boolean isLeaf(ExecutionBlock execBlock) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 6fe2c9e..4d5a951 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -471,7 +471,7 @@ public class TaskSchedulerImpl extends AbstractService
task.getLogicalPlan().toJson(),
context.getQueryContext(),
subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
- if (!subQuery.getBlock().isRoot()) {
+ if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
taskAssign.setInterQuery();
}
@@ -520,7 +520,7 @@ public class TaskSchedulerImpl extends AbstractService
context.getQueryContext(),
subQuery.getDataChannel(),
subQuery.getBlock().getEnforcer());
- if (!subQuery.getBlock().isRoot()) {
+ if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
taskAssign.setInterQuery();
}
for (ScanNode scan : task.getScanNodes()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 14a9a81..34b2892 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -28,19 +28,19 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.AbstractStorageManager;
@@ -299,7 +299,7 @@ public class Query implements EventHandler<QueryEvent> {
// if the subquery is succeeded
if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
ExecutionBlock nextBlock = cursor.nextBlock();
- if (!query.getPlan().isTerminal(nextBlock) || !query.getPlan().isRoot(nextBlock)) {
+ if (!query.getPlan().isTerminal(nextBlock)) {
SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
nextSubQuery.setPriority(query.priority--);
query.addSubQuery(nextSubQuery);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index ef14c31..d58a6a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -227,7 +227,7 @@ public class Repartitioner {
private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit unit = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+ QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.getMasterPlan().isLeaf(execBlock),
subQuery.getEventHandler());
unit.setLogicalPlan(execBlock.getPlan());
unit.setFragment2(fragment);
@@ -239,7 +239,7 @@ public class Repartitioner {
QueryUnit [] tasks = new QueryUnit[taskNum];
for (int i = 0; i < taskNum; i++) {
tasks[i] = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
+ QueryIdFactory.newQueryUnitId(subQuery.getId(), i), subQuery.getMasterPlan().isLeaf(execBlock),
subQuery.getEventHandler());
tasks[i].setLogicalPlan(execBlock.getPlan());
for (FileFragment fragment : fragments) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 11455fb..0c8f395 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -626,7 +626,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
ContainerAllocationEvent event =
new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
subQuery.getId(), priority, resource, numRequest,
- execBlock.isLeafBlock(), 0.0f);
+ subQuery.masterPlan.isLeaf(execBlock), 0.0f);
subQuery.eventHandler.handle(event);
}
@@ -661,7 +661,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit unit = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+ QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.masterPlan.isLeaf(execBlock),
subQuery.eventHandler);
unit.setLogicalPlan(execBlock.getPlan());
unit.setFragment2(fragment);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index b5516f0..f931615 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -18,6 +18,7 @@
package org.apache.tajo.worker;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -157,10 +158,10 @@ public class Task {
interQuery = request.getProto().getInterQuery();
if (interQuery) {
context.setInterQuery();
- StoreTableNode store = (StoreTableNode) plan;
- this.partitionType = store.getPartitionType();
+ this.partitionType = context.getDataChannel().getPartitionType();
+
if (partitionType == PartitionType.RANGE_PARTITION) {
- SortNode sortNode = store.getChild();
+ SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
}