You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/08/27 18:37:30 UTC

tajo git commit: TAJO-1801: Table name is not unique of tableDescMap in QueryMasterTask.

Repository: tajo
Updated Branches:
  refs/heads/master 4be674610 -> 848a8c3d5


TAJO-1801: Table name is not unique of tableDescMap in QueryMasterTask.

Closes #710


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/848a8c3d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/848a8c3d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/848a8c3d

Branch: refs/heads/master
Commit: 848a8c3d58fa219dee09baa808191d65fff61ffe
Parents: 4be6746
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Aug 28 01:36:55 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Aug 28 01:36:55 2015 +0900

----------------------------------------------------------------------
 CHANGES                                               |  5 ++++-
 .../main/java/org/apache/tajo/querymaster/Query.java  | 12 ++++--------
 .../org/apache/tajo/querymaster/QueryMasterTask.java  | 12 ++++++------
 .../org/apache/tajo/querymaster/Repartitioner.java    |  6 +++---
 .../main/java/org/apache/tajo/querymaster/Stage.java  | 14 ++++++--------
 5 files changed, 23 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/848a8c3d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 32e84b5..8fcc1d0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -234,6 +234,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1801: Table name is not unique of tableDescMap in QueryMasterTask. 
+    (jihoon)
+
     TAJO-1798: Dynamic partitioning occasionally fails. (jaehwa)
     
     TAJO-1799: Fix incorrect event handler when kill-query failed. (jinho)
@@ -250,7 +253,7 @@ Release 0.11.0 - unreleased
 
     TAJO-1776: Fix Invalid column type in JDBC. (jinho)
 
-    TAJO-1781: Join condition is still not found when it exists in OR clause. i
+    TAJO-1781: Join condition is still not found when it exists in OR clause. 
     (jihoon)
 
     TAJO-1777: JsonLineDeserializer returns invalid unicode text, 

http://git-wip-us.apache.org/repos/asf/tajo/blob/848a8c3d/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 9560353..8bb3845 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -35,26 +35,22 @@ import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.ExecutionQueue;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.util.history.StageHistory;

http://git-wip-us.apache.org/repos/asf/tajo/blob/848a8c3d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 52e0a96..1313dad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -91,7 +91,7 @@ public class QueryMasterTask extends CompositeService {
 
   private final long querySubmitTime;
 
-  private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+  private final Map<Integer, TableDesc> tableDescMap = new HashMap<>();
 
   private TajoConf systemConf;
 
@@ -332,7 +332,7 @@ public class QueryMasterTask extends CompositeService {
         if (scanNodes != null) {
           for (LogicalNode eachScanNode : scanNodes) {
             ScanNode scanNode = (ScanNode) eachScanNode;
-            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+            tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc());
           }
         }
 
@@ -340,7 +340,7 @@ public class QueryMasterTask extends CompositeService {
         if (scanNodes != null) {
           for (LogicalNode eachScanNode : scanNodes) {
             ScanNode scanNode = (ScanNode) eachScanNode;
-            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+            tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc());
           }
         }
 
@@ -348,7 +348,7 @@ public class QueryMasterTask extends CompositeService {
         if (scanNodes != null) {
           for (LogicalNode eachScanNode : scanNodes) {
             ScanNode scanNode = (ScanNode) eachScanNode;
-            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+            tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc());
           }
         }
       }
@@ -533,8 +533,8 @@ public class QueryMasterTask extends CompositeService {
       return query.getStage(id);
     }
 
-    public Map<String, TableDesc> getTableDescMap() {
-      return tableDescMap;
+    public TableDesc getTableDesc(ScanNode scanNode) {
+      return tableDescMap.get(scanNode.getPID());
     }
 
     public float getProgress() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/848a8c3d/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 0d5880e..c4fc645 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -88,7 +88,7 @@ public class Repartitioner {
 
     // initialize variables from the child operators
     for (int i = 0; i < scans.length; i++) {
-      TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
+      TableDesc tableDesc = masterContext.getTableDesc(scans[i]);
 
       if (tableDesc == null) { // if it is a real table stored on storage
         if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) {
@@ -376,7 +376,7 @@ public class Repartitioner {
       for (ScanNode eachScan: broadcastScans) {
 
         Path[] partitionScanPaths = null;
-        TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
+        TableDesc tableDesc = masterContext.getTableDesc(eachScan);
         Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
 
         if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
@@ -498,7 +498,7 @@ public class Repartitioner {
     List<Fragment> broadcastFragments = new ArrayList<Fragment>();
     for (int i = 0; i < scans.length; i++) {
       ScanNode scan = scans[i];
-      TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName());
+      TableDesc desc = stage.getContext().getTableDesc(scan);
       TableMeta meta = desc.getMeta();
 
       Collection<Fragment> scanFragments;

http://git-wip-us.apache.org/repos/asf/tajo/blob/848a8c3d/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index f6c9cdb..12e4366 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -31,7 +31,6 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
@@ -44,12 +43,12 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.rpc.AsyncRpcClient;
@@ -57,8 +56,8 @@ import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
@@ -76,9 +75,9 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.tajo.ResourceProtos.*;
 import static org.apache.tajo.conf.TajoConf.ConfVars;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-import static org.apache.tajo.ResourceProtos.*;
 
 
 /**
@@ -1058,12 +1057,11 @@ public class Stage implements EventHandler<StageEvent> {
 
     public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
                                       ExecutionBlock execBlock) {
-      Map<String, TableDesc> tableMap = context.getTableDescMap();
       if (masterPlan.isLeaf(execBlock)) {
         ScanNode[] outerScans = execBlock.getScanNodes();
         long maxVolume = 0;
         for (ScanNode eachScanNode: outerScans) {
-          TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
+          TableStats stat = context.getTableDesc(eachScanNode).getStats();
           if (stat.getNumBytes() > maxVolume) {
             maxVolume = stat.getNumBytes();
           }
@@ -1089,7 +1087,7 @@ public class Stage implements EventHandler<StageEvent> {
       ScanNode[] scans = execBlock.getScanNodes();
       Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
       ScanNode scan = scans[0];
-      TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName());
+      TableDesc table = stage.context.getTableDesc(scan);
 
       Collection<Fragment> fragments;
       Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri()).get();