You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/24 13:18:43 UTC

git commit: TAJO-326: In ExecutionBlock, isRoot() and isLeafBlock() return invalid values. (jihoon)

Updated Branches:
  refs/heads/master b37453d00 -> e5b881081


TAJO-326: In ExecutionBlock, isRoot() and isLeafBlock() return invalid values. (jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e5b88108
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e5b88108
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e5b88108

Branch: refs/heads/master
Commit: e5b881081230da251ba86e122a3bac02a91c5639
Parents: b37453d
Author: Jihoon Son <ji...@apache.org>
Authored: Sun Nov 24 21:18:00 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Sun Nov 24 21:18:00 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../engine/planner/global/ExecutionBlock.java   | 29 --------------------
 .../tajo/engine/planner/global/MasterPlan.java  |  8 ++++--
 .../apache/tajo/master/TaskSchedulerImpl.java   |  4 +--
 .../apache/tajo/master/querymaster/Query.java   | 12 ++++----
 .../tajo/master/querymaster/Repartitioner.java  |  4 +--
 .../tajo/master/querymaster/SubQuery.java       |  4 +--
 .../main/java/org/apache/tajo/worker/Task.java  |  7 +++--
 8 files changed, 24 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2550019..ec2e704 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,8 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-326: In ExecutionBlock, isRoot() and isLeafBlock() return invalid values. (jihoon)
+
     TAJO-296: Late registration of Tajo workers. (hyoungjunkim via hyunsik)
 
     TAJO-321: Invalid split file of compressed text file. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 0dc393c..efa1c7f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -21,8 +21,6 @@ import org.apache.tajo.engine.planner.logical.*;
 
 import java.util.*;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-
 /**
  * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
  * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
@@ -35,9 +33,6 @@ public class ExecutionBlock {
   private LogicalNode plan = null;
   private StoreTableNode store = null;
   private List<ScanNode> scanlist = new ArrayList<ScanNode>();
-  private ExecutionBlock parent;
-  private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
-  private PartitionType outputType;
   private Enforcer enforcer = new Enforcer();
 
   private boolean hasJoinPlan;
@@ -53,10 +48,6 @@ public class ExecutionBlock {
     return executionBlockId;
   }
 
-  public PartitionType getPartitionType() {
-    return outputType;
-  }
-
   public void setPlan(LogicalNode plan) {
     hasJoinPlan = false;
     hasUnionPlan = false;
@@ -98,26 +89,6 @@ public class ExecutionBlock {
     return enforcer;
   }
 
-  public boolean isRoot() {
-    return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
-  }
-
-  public boolean hasParentBlock() {
-    return parent != null;
-  }
-
-  public ExecutionBlock getParentBlock() {
-    return parent;
-  }
-
-  public Collection<ExecutionBlock> getChildBlocks() {
-    return Collections.unmodifiableCollection(childSubQueries.values());
-  }
-
-  public boolean isLeafBlock() {
-    return childSubQueries.size() == 0;
-  }
-
   public StoreTableNode getStoreTableNode() {
     return store;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index b2804cf..6aaeb1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -51,7 +51,7 @@ public class MasterPlan {
   }
 
   public boolean isTerminal(ExecutionBlock execBlock) {
-    return terminalBlock == execBlock;
+    return terminalBlock.getId().equals(execBlock.getId());
   }
 
   public ExecutionBlock getTerminalBlock() {
@@ -144,7 +144,11 @@ public class MasterPlan {
   }
 
   public boolean isRoot(ExecutionBlock execBlock) {
-    return execBlockGraph.isRoot(execBlock.getId());
+    if (!execBlock.getId().equals(terminalBlock.getId())) {
+      return execBlockGraph.getParent(execBlock.getId()).equals(terminalBlock.getId());
+    } else {
+      return false;
+    }
   }
 
   public boolean isLeaf(ExecutionBlock execBlock) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 6fe2c9e..4d5a951 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -471,7 +471,7 @@ public class TaskSchedulerImpl extends AbstractService
               task.getLogicalPlan().toJson(),
               context.getQueryContext(),
               subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
-          if (!subQuery.getBlock().isRoot()) {
+          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
 
@@ -520,7 +520,7 @@ public class TaskSchedulerImpl extends AbstractService
               context.getQueryContext(),
               subQuery.getDataChannel(),
               subQuery.getBlock().getEnforcer());
-          if (!subQuery.getBlock().isRoot()) {
+          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
           for (ScanNode scan : task.getScanNodes()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 14a9a81..34b2892 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -28,19 +28,19 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
@@ -299,7 +299,7 @@ public class Query implements EventHandler<QueryEvent> {
       // if the subquery is succeeded
       if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
         ExecutionBlock nextBlock = cursor.nextBlock();
-        if (!query.getPlan().isTerminal(nextBlock) || !query.getPlan().isRoot(nextBlock)) {
+        if (!query.getPlan().isTerminal(nextBlock)) {
           SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
           nextSubQuery.setPriority(query.priority--);
           query.addSubQuery(nextSubQuery);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index ef14c31..d58a6a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -227,7 +227,7 @@ public class Repartitioner {
   private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit unit = new QueryUnit(
-        QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+        QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.getMasterPlan().isLeaf(execBlock),
         subQuery.getEventHandler());
     unit.setLogicalPlan(execBlock.getPlan());
     unit.setFragment2(fragment);
@@ -239,7 +239,7 @@ public class Repartitioner {
     QueryUnit [] tasks = new QueryUnit[taskNum];
     for (int i = 0; i < taskNum; i++) {
       tasks[i] = new QueryUnit(
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), i), subQuery.getMasterPlan().isLeaf(execBlock),
           subQuery.getEventHandler());
       tasks[i].setLogicalPlan(execBlock.getPlan());
       for (FileFragment fragment : fragments) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 11455fb..0c8f395 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -626,7 +626,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       ContainerAllocationEvent event =
           new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
               subQuery.getId(), priority, resource, numRequest,
-              execBlock.isLeafBlock(), 0.0f);
+              subQuery.masterPlan.isLeaf(execBlock), 0.0f);
       subQuery.eventHandler.handle(event);
     }
 
@@ -661,7 +661,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
       ExecutionBlock execBlock = subQuery.getBlock();
       QueryUnit unit = new QueryUnit(
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.masterPlan.isLeaf(execBlock),
           subQuery.eventHandler);
       unit.setLogicalPlan(execBlock.getPlan());
       unit.setFragment2(fragment);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e5b88108/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index b5516f0..f931615 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -157,10 +158,10 @@ public class Task {
     interQuery = request.getProto().getInterQuery();
     if (interQuery) {
       context.setInterQuery();
-      StoreTableNode store = (StoreTableNode) plan;
-      this.partitionType = store.getPartitionType();
+      this.partitionType = context.getDataChannel().getPartitionType();
+
       if (partitionType == PartitionType.RANGE_PARTITION) {
-        SortNode sortNode = store.getChild();
+        SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
       }