You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/24 09:39:50 UTC

[1/4] tajo git commit: TAJO-1396 Unexpected IllegalMonitorStateException can be thrown in QueryInProgress

Repository: tajo
Updated Branches:
  refs/heads/index_support d09bd8ddc -> 12846ccae


TAJO-1396 Unexpected IllegalMonitorStateException can be thrown in QueryInProgress

Closes #416

Signed-off-by: Jinho Kim <jh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a9215852
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a9215852
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a9215852

Branch: refs/heads/index_support
Commit: a9215852d4bf9e5d7a29598a992ef1884f098d4b
Parents: 8d0146b
Author: navis.ryu <na...@apache.org>
Authored: Thu Mar 12 22:15:49 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Tue Mar 24 13:33:40 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +++
 .../org/apache/tajo/master/QueryInProgress.java | 22 ++++++++++++++------
 .../org/apache/tajo/master/QueryManager.java    |  8 +------
 3 files changed, 20 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/a9215852/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ad3a6bd..811992c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -38,6 +38,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1396: Unexpected IllegalMonitorStateException can be thrown 
+    in QueryInProgress. (Contributed by navis. Committed by jinho)
+
     TAJO-1414: Two RemoteException in rpc module. 
     (Contributed by navis. Committed by jihun)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9215852/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index c24dd90..668a770 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -95,7 +95,7 @@ public class QueryInProgress {
         queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
       }
     } catch (Throwable e) {
-      catchException(e);
+      catchException("Failed to kill query " + queryId + " by exception " + e, e);
     } finally {
       writeLock.unlock();
     }
@@ -125,6 +125,11 @@ public class QueryInProgress {
   public boolean startQueryMaster() {
     try {
       writeLock.lockInterruptibly();
+    } catch (Exception e) {
+      catchException("Failed to lock by exception " + e, e);
+      return false;
+    }
+    try {
       LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
       WorkerResourceManager resourceManager = masterContext.getResourceManager();
       WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
@@ -141,7 +146,7 @@ public class QueryInProgress {
 
       return true;
     } catch (Exception e) {
-      catchException(e);
+      catchException("Failed to start query master for query " + queryId + " by exception " + e, e);
       return false;
     } finally {
       writeLock.unlock();
@@ -163,12 +168,17 @@ public class QueryInProgress {
 
     try {
       writeLock.lockInterruptibly();
+    } catch (Exception e) {
+      LOG.error("Failed to lock by exception " + e.getMessage(), e);
+      return;
+    }
 
+    try {
       if(queryMasterRpcClient == null) {
         connectQueryMaster();
       }
       if(queryMasterRpcClient == null) {
-        LOG.info("No QueryMaster conneciton info.");
+        LOG.info("No QueryMaster connection info.");
         //TODO wait
         return;
       }
@@ -186,14 +196,14 @@ public class QueryInProgress {
       querySubmitted.set(true);
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
     } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
+      LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e);
     } finally {
       writeLock.unlock();
     }
   }
 
-  public void catchException(Throwable e) {
-    LOG.error(e.getMessage(), e);
+  public void catchException(String message, Throwable e) {
+    LOG.error(message, e);
     queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
     queryInfo.setLastMessage(StringUtils.stringifyException(e));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a9215852/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index b1fa17d..0c8d8ce 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -87,7 +87,7 @@ public class QueryManager extends CompositeService {
 
       this.scheduler = new SimpleFifoScheduler(this);
     } catch (Exception e) {
-      catchException(null, e);
+      LOG.error("Failed to init service " + getName() + " by exception " + e, e);
     }
 
     super.serviceInit(conf);
@@ -304,12 +304,6 @@ public class QueryManager extends CompositeService {
     return executedQuerySize.get();
   }
 
-  private void catchException(QueryId queryId, Exception e) {
-    LOG.error(e.getMessage(), e);
-    QueryInProgress queryInProgress = runningQueries.get(queryId);
-    queryInProgress.catchException(e);
-  }
-
   public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
       QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
     QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));


[4/4] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Conflicts:
	tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
	tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/12846cca
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/12846cca
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/12846cca

Branch: refs/heads/index_support
Commit: 12846ccae5bd5c3673d8812d53010ef432c947c6
Parents: d09bd8d 3e9a2dd
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Mar 24 17:39:46 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Mar 24 17:39:46 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   9 +
 .../java/org/apache/tajo/algebra/Explain.java   |   9 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   1 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../org/apache/tajo/datum/BooleanDatum.java     |   9 +-
 .../org/apache/tajo/datum/DatumFactory.java     |   4 +
 .../java/org/apache/tajo/datum/TextDatum.java   |   4 +-
 .../java/org/apache/tajo/util/FileUtil.java     |  25 ++-
 .../org/apache/tajo/engine/parser/SQLLexer.g4   |   1 +
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   2 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   2 +-
 .../engine/planner/global/GlobalPlanner.java    |   4 +
 .../engine/planner/physical/MemSortExec.java    |  28 ++-
 .../tajo/engine/planner/physical/SortExec.java  |   4 +-
 .../engine/planner/physical/TupleSorter.java    |  27 +++
 .../planner/physical/VectorizedSorter.java      | 198 +++++++++++++++++
 .../org/apache/tajo/master/QueryInProgress.java |  22 +-
 .../org/apache/tajo/master/QueryManager.java    |   8 +-
 .../ExplainGlobalPlanPreprocessorForTest.java   |  62 ++++++
 .../exec/ExplainPlanPreprocessorForTest.java    | 218 +++++++++++++++++++
 .../apache/tajo/master/exec/QueryExecutor.java  |  61 +++++-
 .../tajo/querymaster/QueryMasterTask.java       |   3 -
 .../java/org/apache/tajo/QueryTestCaseBase.java |  75 ++++++-
 .../planner/physical/TestTupleSorter.java       | 132 +++++++++++
 .../tajo/engine/query/TestSelectQuery.java      |  16 ++
 .../testExplainSelectPhysical.1.result          |  26 +++
 .../testExplainSelectPhysical.2.result          |  88 ++++++++
 .../testExplainSelectPhysical.3.result          |  89 ++++++++
 .../java/org/apache/tajo/plan/LogicalPlan.java  |  17 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |   2 +-
 .../apache/tajo/plan/logical/IndexScanNode.java |   4 +-
 31 files changed, 1093 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 31a28f7,75e7762..ca3abbd
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@@ -48,6 -47,7 +50,8 @@@ import org.apache.tajo.master.*
  import org.apache.tajo.master.exec.prehook.CreateTableHook;
  import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
  import org.apache.tajo.master.exec.prehook.InsertIntoHook;
+ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
  import org.apache.tajo.querymaster.*;
  import org.apache.tajo.session.Session;
  import org.apache.tajo.plan.LogicalPlan;
@@@ -99,20 -98,13 +103,20 @@@ public class QueryExecutor 
  
      } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
        context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
 -      ddlExecutor.execute(queryContext, plan);
 -      response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
 -      response.setResultCode(ClientProtos.ResultCode.OK);
  
 +      if (PlannerUtil.isDistExecDDL(rootNode)) {
 +        if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) {
 +          checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild());
 +        }
 +        executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
 +      } else {
 +        response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
 +        response.setResult(IPCUtil.buildOkRequestResult());
 +        ddlExecutor.execute(queryContext, plan);
 +      }
  
      } else if (plan.isExplain()) { // explain query
-       execExplain(plan, response);
+       execExplain(plan, queryContext, plan.isExplainGlobal(), response);
  
      } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) {
        execQueryOnVirtualTable(queryContext, session, sql, plan, response);
@@@ -165,12 -157,31 +169,31 @@@
  
      context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
      response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
 -    response.setResultCode(ClientProtos.ResultCode.OK);
 +    response.setResult(IPCUtil.buildOkRequestResult());
    }
  
-   public void execExplain(LogicalPlan plan, SubmitQueryResponse.Builder response) throws IOException {
+   public void execExplain(LogicalPlan plan, QueryContext queryContext, boolean isGlobal,
+                           SubmitQueryResponse.Builder response)
+       throws Exception {
+     String explainStr;
+     boolean isTest = queryContext.getBool(SessionVars.TEST_PLAN_SHAPE_FIX_ENABLED);
+     if (isTest) {
+       ExplainPlanPreprocessorForTest preprocessorForTest = new ExplainPlanPreprocessorForTest();
+       preprocessorForTest.prepareTest(plan);
+     }
+ 
+     if (isGlobal) {
+       GlobalPlanner planner = new GlobalPlanner(context.getConf(), context.getCatalog());
+       MasterPlan masterPlan = compileMasterPlan(plan, queryContext, planner);
+       if (isTest) {
+         ExplainGlobalPlanPreprocessorForTest globalPlanPreprocessorForTest = new ExplainGlobalPlanPreprocessorForTest();
+         globalPlanPreprocessorForTest.prepareTest(masterPlan);
+       }
+       explainStr = masterPlan.toString();
+     } else {
+       explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
+     }
  
-     String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
      Schema schema = new Schema();
      schema.addColumn("explain", TajoDataTypes.Type.TEXT);
      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
@@@ -428,22 -439,33 +451,52 @@@
      }
    }
  
 +  private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
 +      throws IOException {
 +    String databaseName, simpleIndexName, qualifiedIndexName;
 +    if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
-       String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
++      String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
 +      databaseName = splits[0];
 +      simpleIndexName = splits[1];
 +      qualifiedIndexName = createIndexNode.getIndexName();
 +    } else {
 +      databaseName = queryContext.getCurrentDatabase();
 +      simpleIndexName = createIndexNode.getIndexName();
 +      qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
 +    }
 +
 +    if (catalog.existIndexByName(databaseName, simpleIndexName)) {
 +      throw new AlreadyExistsIndexException(qualifiedIndexName);
 +    }
 +  }
++
+   public static MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
+       throws Exception {
+ 
+     CatalogProtos.StoreType storeType = PlannerUtil.getStoreType(plan);
+     if (storeType != null) {
+       StorageManager sm = StorageManager.getStorageManager(planner.getConf(), storeType);
+       StorageProperty storageProperty = sm.getStorageProperty();
+       if (storageProperty.isSortedInsert()) {
+         String tableName = PlannerUtil.getStoreTableName(plan);
+         LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+         TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
+         if (tableDesc == null) {
+           throw new VerifyException("Can't get table meta data from catalog: " + tableName);
+         }
+         List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+             context, tableDesc);
+         if (storageSpecifiedRewriteRules != null) {
+           for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
 -            eachRule.rewrite(context, plan);
++            eachRule.rewrite(new LogicalPlanRewriteRuleContext(context, plan));
+           }
+         }
+       }
+     }
+ 
+     MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);
+     planner.build(masterPlan);
+ 
+     return masterPlan;
+   }
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index d38e2da,f83cb1e..26111c3
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@@ -51,12 -46,15 +51,9 @@@ import org.apache.tajo.ipc.TajoWorkerPr
  import org.apache.tajo.master.TajoContainerProxy;
  import org.apache.tajo.master.event.*;
  import org.apache.tajo.master.rm.TajoWorkerResourceManager;
--import org.apache.tajo.plan.LogicalOptimizer;
--import org.apache.tajo.plan.LogicalPlan;
--import org.apache.tajo.plan.LogicalPlanner;
  import org.apache.tajo.plan.logical.LogicalNode;
 -import org.apache.tajo.plan.logical.LogicalRootNode;
  import org.apache.tajo.plan.logical.NodeType;
  import org.apache.tajo.plan.logical.ScanNode;
 -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 -import org.apache.tajo.plan.util.PlannerUtil;
  import org.apache.tajo.plan.verifier.VerifyException;
  import org.apache.tajo.session.Session;
  import org.apache.tajo.storage.StorageManager;

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/12846cca/tajo-plan/src/main/java/org/apache/tajo/plan/logical/IndexScanNode.java
----------------------------------------------------------------------
diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/logical/IndexScanNode.java
index e320ce9,8b73756..ebf6c4b
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/IndexScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/IndexScanNode.java
@@@ -82,9 -84,9 +82,9 @@@ public class IndexScanNode extends Scan
    public int hashCode() {
      final int prime = 31;
      int result = super.hashCode();
--    result = prime * result + Arrays.hashCode(datum);
++    result = prime * result + indexPath.hashCode();
      result = prime * result + ((keySchema == null) ? 0 : keySchema.hashCode());
--    result = prime * result + Arrays.hashCode(sortKeys);
++    result = prime * result + Arrays.hashCode(predicates);
      return result;
    }
  


[2/4] tajo git commit: TAJO-1407: Minor performance improvement of MemSortExec.

Posted by ji...@apache.org.
TAJO-1407: Minor performance improvement of MemSortExec.

Closes #426

Signed-off-by: Jihoon Son <ji...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d7b5212c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d7b5212c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d7b5212c

Branch: refs/heads/index_support
Commit: d7b5212ceb569ba145318d78cff46ca49bef973c
Parents: a921585
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Mar 24 17:09:20 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Mar 24 17:09:20 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/datum/BooleanDatum.java     |   9 +-
 .../org/apache/tajo/datum/DatumFactory.java     |   4 +
 .../java/org/apache/tajo/datum/TextDatum.java   |   4 +-
 .../engine/planner/physical/MemSortExec.java    |  28 ++-
 .../tajo/engine/planner/physical/SortExec.java  |   4 +-
 .../engine/planner/physical/TupleSorter.java    |  27 +++
 .../planner/physical/VectorizedSorter.java      | 198 +++++++++++++++++++
 .../planner/physical/TestTupleSorter.java       | 132 +++++++++++++
 9 files changed, 389 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 811992c..58c061a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1407: Minor performance improvement of MemSortExec. (Contributed by
+    navis, Committed by jihoon)
+
     TAJO-1403: Improve 'Simple Query' with only partition columns and constant 
     values. (Contributed by Dongjoon Hyun, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
index 93933a8..596540f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.datum;
 
+import com.google.common.primitives.Booleans;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.exception.InvalidOperationException;
@@ -167,13 +168,7 @@ public class BooleanDatum extends Datum {
   public int compareTo(Datum datum) {
     switch (datum.type()) {
     case BOOLEAN:
-      if (val && !datum.asBool()) {
-        return -1;
-      } else if (val && datum.asBool()) {
-        return 1;
-      } else {
-        return 0;
-      }
+      return Booleans.compare(val, datum.asBool());
     default:
       throw new InvalidOperationException(datum.type());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
index 11ba791..9f48cad 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
@@ -317,6 +317,10 @@ public class DatumFactory {
     return new IntervalDatum(interval);
   }
 
+  public static IntervalDatum createInterval(int month, long interval) {
+    return new IntervalDatum(month, interval);
+  }
+
   public static DateDatum createDate(Datum datum) {
     switch (datum.type()) {
     case INT4:

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index ba45e71..ffd6ca2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -33,7 +33,6 @@ import java.util.Comparator;
 public class TextDatum extends Datum {
   public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
-  @Expose private final int size;
   /* encoded in UTF-8 */
   @Expose private final byte[] bytes;
 
@@ -44,7 +43,6 @@ public class TextDatum extends Datum {
   public TextDatum(byte[] bytes) {
     super(TajoDataTypes.Type.TEXT);
     this.bytes = bytes;
-    this.size = bytes.length;
   }
 
   public TextDatum(String string) {
@@ -108,7 +106,7 @@ public class TextDatum extends Datum {
 
   @Override
   public int size() {
-    return size;
+    return bytes.length;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index c77313e..a2e039c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -29,12 +29,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-public class MemSortExec extends SortExec {
+public class MemSortExec extends SortExec implements TupleSorter {
   private SortNode plan;
   private List<Tuple> tupleSlots;
   private boolean sorted = false;
   private Iterator<Tuple> iterator;
-  
+
   public MemSortExec(final TaskAttemptContext context,
                      SortNode plan, PhysicalExec child) {
     super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
@@ -43,7 +43,7 @@ public class MemSortExec extends SortExec {
 
   public void init() throws IOException {
     super.init();
-    this.tupleSlots = new ArrayList<Tuple>(1000);
+    this.tupleSlots = new ArrayList<Tuple>(10000);
   }
 
   @Override
@@ -54,12 +54,10 @@ public class MemSortExec extends SortExec {
       while (!context.isStopped() && (tuple = child.next()) != null) {
         tupleSlots.add(new VTuple(tuple));
       }
-      
-      Collections.sort(tupleSlots, getComparator());
-      this.iterator = tupleSlots.iterator();
+      iterator = getSorter().sort();
       sorted = true;
     }
-    
+
     if (iterator.hasNext()) {
       return this.iterator.next();
     } else {
@@ -67,6 +65,14 @@ public class MemSortExec extends SortExec {
     }
   }
 
+  private TupleSorter getSorter() {
+    try {
+      return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds());
+    } catch (Exception e) {
+      return this;
+    }
+  }
+
   @Override
   public void rescan() throws IOException {
     super.rescan();
@@ -86,4 +92,10 @@ public class MemSortExec extends SortExec {
   public SortNode getPlan() {
     return this.plan;
   }
-}
+
+  @Override
+  public Iterator<Tuple> sort() {
+    Collections.sort(tupleSlots, comparator);
+    return tupleSlots.iterator();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index c0703dd..fb6a3b2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -29,8 +29,8 @@ import java.io.IOException;
 import java.util.Comparator;
 
 public abstract class SortExec extends UnaryPhysicalExec {
-  private final TupleComparator comparator;
-  private final SortSpec [] sortSpecs;
+  protected final BaseTupleComparator comparator;
+  protected final SortSpec [] sortSpecs;
 
   public SortExec(TaskAttemptContext context, Schema inSchema,
                   Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
new file mode 100644
index 0000000..d240e4a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.util.Iterator;
+
+public interface TupleSorter {
+  Iterator<Tuple> sort();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
new file mode 100644
index 0000000..891d104
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Floats;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting
+ * Uses indirection for efficient swapping
+ */
+public class VectorizedSorter implements IndexedSortable, TupleSorter {
+
+  private final Tuple[] tuples;         // source tuples
+  private final TupleVector[] vectors;  // values of key columns
+  private final int[] mappings;         // index indirection
+
+  public VectorizedSorter(List<Tuple> source, SortSpec[] sortKeys, int[] keyIndex) {
+    this.tuples = source.toArray(new Tuple[source.size()]);
+    vectors = new TupleVector[sortKeys.length];
+    mappings = new int[tuples.length];
+    for (int i = 0; i < vectors.length; i++) {
+      TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType();
+      boolean nullFirst = sortKeys[i].isNullFirst();
+      boolean ascending = sortKeys[i].isAscending();
+      boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending;
+      vectors[i] = new TupleVector(TupleVector.getType(type), tuples.length, nullInvert, ascending);
+    }
+    for (int i = 0; i < tuples.length; i++) {
+      for (int j = 0; j < keyIndex.length; j++) {
+        vectors[j].add(tuples[i].get(keyIndex[j]));
+      }
+      mappings[i] = i;
+    }
+  }
+
+  @Override
+  public int compare(int i1, int i2) {
+    final int index1 = mappings[i1];
+    final int index2 = mappings[i2];
+    for (TupleVector vector : vectors) {
+      int compare = vector.compare(index1, index2);
+      if (compare != 0) {
+        return compare;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public void swap(int i1, int i2) {
+    int v1 = mappings[i1];
+    mappings[i1] = mappings[i2];
+    mappings[i2] = v1;
+  }
+
+  @Override
+  public Iterator<Tuple> sort() {
+    new QuickSort().sort(VectorizedSorter.this, 0, mappings.length);
+    return new Iterator<Tuple>() {
+      int index;
+      public boolean hasNext() { return index < mappings.length; }
+      public Tuple next() { return tuples[mappings[index++]]; }
+      public void remove() { throw new UnsupportedException(); }
+    };
+  }
+
+  private static class TupleVector {
+
+    private final int type;
+    private final BitSet nulls;
+    private final boolean nullInvert;
+    private final boolean ascending;
+
+    private boolean[] booleans;
+    private byte[] bits;
+    private short[] shorts;
+    private int[] ints;
+    private long[] longs;
+    private float[] floats;
+    private double[] doubles;
+    private byte[][] bytes;
+
+    private int index;
+
+    private TupleVector(int type, int length, boolean nullInvert, boolean ascending) {
+      this.type = type;
+      this.nulls = new BitSet(length);
+      this.nullInvert = nullInvert;
+      this.ascending = ascending;
+      switch (type) {
+        case 0: booleans = new boolean[length]; break;
+        case 1: bits = new byte[length]; break;
+        case 2: shorts = new short[length]; break;
+        case 3: ints = new int[length]; break;
+        case 4: longs = new long[length]; break;
+        case 5: floats = new float[length]; break;
+        case 6: doubles = new double[length]; break;
+        case 7: bytes = new byte[length][]; break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+
+    private void add(Datum datum) {
+      if (datum.isNull()) {
+        nulls.set(index++);
+        return;
+      }
+      switch (type) {
+        case 0: booleans[index] = datum.asBool(); break;
+        case 1: bits[index] = datum.asByte(); break;
+        case 2: shorts[index] = datum.asInt2(); break;
+        case 3: ints[index] = datum.asInt4(); break;
+        case 4: longs[index] = datum.asInt8(); break;
+        case 5: floats[index] = datum.asFloat4(); break;
+        case 6: doubles[index] = datum.asFloat8(); break;
+        case 7: bytes[index] = datum.asByteArray(); break;
+        default:
+          throw new IllegalArgumentException();
+      }
+      index++;
+    }
+
+    private int compare(int index1, int index2) {
+      final boolean n1 = nulls.get(index1);
+      final boolean n2 = nulls.get(index2);
+      if (n1 && n2) {
+        return 0;
+      }
+      if (n1 ^ n2) {
+        int compVal = n1 ? 1 : -1;
+        return nullInvert ? -compVal : compVal;
+      }
+      int compare;
+      switch (type) {
+        case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break;
+        case 1: compare = bits[index1] - bits[index2]; break;
+        case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break;
+        case 3: compare = Ints.compare(ints[index1], ints[index2]); break;
+        case 4: compare = Longs.compare(longs[index1], longs[index2]); break;
+        case 5: compare = Floats.compare(floats[index1], floats[index2]); break;
+        case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break;
+        case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break;
+        default:
+          throw new IllegalArgumentException();
+      }
+      return ascending ? compare : -compare;
+    }
+
+    public static int getType(TajoDataTypes.Type type) {
+      switch (type) {
+        case BOOLEAN: return 0;
+        case BIT: case INT1: return 1;
+        case INT2: return 2;
+        case INT4: case DATE: case INET4: return 3;
+        case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4;
+        case FLOAT4: return 5;
+        case FLOAT8: return 6;
+        case TEXT: case CHAR: case BLOB: return 7;
+      }
+      // todo
+      throw new UnsupportedException(type.name());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d7b5212c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
new file mode 100644
index 0000000..fc43d42
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class TestTupleSorter {
+
+  private static final Log LOG = LogFactory.getLog(TestTupleSorter.class);
+
+  private static final Random rnd = new Random(-1);
+
+  @Test
+  public final void testSortBench() {
+    final int MAX_SORT_KEY = 3;
+    final int ITERATION = 10;
+    final int LENGTH = 1000000;
+    final int SAMPLING = 100;
+
+    Tuple[] tuples = new Tuple[LENGTH];
+    for (int i = 0; i < LENGTH; i++) {
+      Datum[] datums = new Datum[]{
+          DatumFactory.createInt4(rnd.nextInt(Short.MAX_VALUE)),
+          DatumFactory.createInt4(rnd.nextInt()),
+          DatumFactory.createText("dept_" + rnd.nextInt()),
+          DatumFactory.createBool(rnd.nextBoolean()),
+          DatumFactory.createInt8(rnd.nextLong()),
+          DatumFactory.createInterval(rnd.nextInt(), rnd.nextLong())};
+      tuples[i] = new VTuple(datums);
+    }
+
+    Column col0 = new Column("col0", Type.INT2);
+    Column col1 = new Column("col1", Type.INT4);
+    Column col2 = new Column("col2", Type.TEXT);
+    Column col3 = new Column("col3", Type.BOOLEAN);
+    Column col4 = new Column("col4", Type.INT8);
+    Column col5 = new Column("col5", Type.INTERVAL);
+
+    Schema schema = new Schema(new Column[] {col0, col1, col2, col3, col4, col5});
+
+    long[] time1 = new long[ITERATION];
+    long[] time2 = new long[ITERATION];
+    for(int iteration = 0; iteration < ITERATION; iteration++) {
+      List<Tuple> target = Arrays.asList(Arrays.copyOf(tuples, tuples.length));
+      Set<Integer> keys = new TreeSet<Integer>();
+      for (int i = 0; i < MAX_SORT_KEY; i++) {
+        keys.add(rnd.nextInt(schema.size()));
+      }
+      int[] keyIndices = Ints.toArray(keys);
+      SortSpec[] sortKeys = new SortSpec[keyIndices.length];
+      for (int i = 0; i < keyIndices.length; i++) {
+        sortKeys[i] = new SortSpec(schema.getColumn(keyIndices[i]), rnd.nextBoolean(), rnd.nextBoolean());
+      }
+
+      long start = System.currentTimeMillis();
+      VectorizedSorter sorter = new VectorizedSorter(target, sortKeys, keyIndices);
+      Iterator<Tuple> iterator = sorter.sort();
+
+      String[] result1 = new String[SAMPLING];
+      for (int i = 0; i < result1.length; i++) {
+        Tuple tuple = Iterators.get(iterator, LENGTH / result1.length - 1);
+        StringBuilder builder = new StringBuilder();
+        for (int keyIndex : keyIndices) {
+          builder.append(tuple.get(keyIndex).asChars());
+        }
+        result1[i] = builder.toString();
+      }
+      time1[iteration] = System.currentTimeMillis() - start;
+
+      BaseTupleComparator comparator = new BaseTupleComparator(schema, sortKeys);
+
+      start = System.currentTimeMillis();
+      Collections.sort(target, comparator);
+      iterator = target.iterator();
+
+      String[] result2 = new String[SAMPLING];
+      for (int i = 0; i < result2.length; i++) {
+        Tuple tuple = Iterators.get(iterator, LENGTH / result2.length - 1);
+        StringBuilder builder = new StringBuilder();
+        for (int keyIndex : keyIndices) {
+          builder.append(tuple.get(keyIndex).asChars());
+        }
+        result2[i] = builder.toString();
+      }
+      time2[iteration] = System.currentTimeMillis() - start;
+
+      LOG.info("Sort on keys " + Arrays.toString(keyIndices) +
+          ": Vectorized " + time1[iteration]+ " msec, Original " + time2[iteration] + " msec");
+
+      assertArrayEquals(result1, result2);
+    }
+  }
+}


[3/4] tajo git commit: TAJO-1426: Support "explain global" to get physical plan.

Posted by ji...@apache.org.
TAJO-1426: Support "explain global" to get physical plan.

Closes #441

Signed-off-by: Jihoon Son <ji...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3e9a2dd2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3e9a2dd2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3e9a2dd2

Branch: refs/heads/index_support
Commit: 3e9a2dd2ba86653ec7b90394518a1a700bd937dc
Parents: d7b5212
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Mar 24 17:12:24 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Mar 24 17:13:57 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/algebra/Explain.java   |   9 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   1 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../java/org/apache/tajo/util/FileUtil.java     |  25 ++-
 .../org/apache/tajo/engine/parser/SQLLexer.g4   |   1 +
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   2 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   2 +-
 .../engine/planner/global/GlobalPlanner.java    |   4 +
 .../ExplainGlobalPlanPreprocessorForTest.java   |  62 ++++++
 .../exec/ExplainPlanPreprocessorForTest.java    | 218 +++++++++++++++++++
 .../apache/tajo/master/exec/QueryExecutor.java  |  58 ++++-
 .../tajo/querymaster/QueryMasterTask.java       |   1 +
 .../java/org/apache/tajo/QueryTestCaseBase.java |  75 ++++++-
 .../tajo/engine/query/TestSelectQuery.java      |  16 ++
 .../testExplainSelectPhysical.1.result          |  26 +++
 .../testExplainSelectPhysical.2.result          |  88 ++++++++
 .../testExplainSelectPhysical.3.result          |  89 ++++++++
 .../java/org/apache/tajo/plan/LogicalPlan.java  |  17 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |   2 +-
 20 files changed, 681 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 58c061a..f573550 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1426: Support "explain global" to get physical plan. (Contributed by
+    navis, Committed by jihoon)
+
     TAJO-1407: Minor performance improvement of MemSortExec. (Contributed by
     navis, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-algebra/src/main/java/org/apache/tajo/algebra/Explain.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/Explain.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/Explain.java
index ee76ea9..2e966b4 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/Explain.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/Explain.java
@@ -22,11 +22,18 @@ import com.google.common.base.Objects;
 
 public class Explain extends UnaryOperator {
 
-  public Explain(Expr operand) {
+  private boolean isGlobal;
+
+  public Explain(Expr operand, boolean isGlobal) {
     super(OpType.Explain);
+    this.isGlobal = isGlobal;
     setChild(operand);
   }
 
+  public boolean isGlobal() {
+    return isGlobal;
+  }
+
   public int hashCode() {
     return Objects.hashCode(getChild());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index b3233ed..5cca413 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -139,6 +139,7 @@ public enum SessionVars implements ConfigKey {
   TEST_JOIN_OPT_ENABLED(ConfVars.$TEST_JOIN_OPT_ENABLED, "(test only) join optimization enabled", TEST_VAR),
   TEST_FILTER_PUSHDOWN_ENABLED(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED, "filter push down enabled", TEST_VAR),
   TEST_MIN_TASK_NUM(ConfVars.$TEST_MIN_TASK_NUM, "(test only) min task num", TEST_VAR),
+  TEST_PLAN_SHAPE_FIX_ENABLED(ConfVars.$TEST_PLAN_SHAPE_FIX_ENABLED, "(test only) plan shape fix enabled", TEST_VAR),
   ;
 
   public static final Map<String, SessionVars> SESSION_VARS = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 5b569d5..ecdb2ef 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -355,6 +355,7 @@ public class TajoConf extends Configuration {
     $TEST_JOIN_OPT_ENABLED("tajo.test.plan.join-optimization.enabled", true),
     $TEST_FILTER_PUSHDOWN_ENABLED("tajo.test.plan.filter-pushdown.enabled", true),
     $TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1),
+    $TEST_PLAN_SHAPE_FIX_ENABLED("tajo.test.plan.shape.fix.enabled", false),  // used for explain statement test
 
     // Behavior Control ---------------------------------------------------------
     $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
index 9aa6af9..9403a2f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
@@ -87,19 +87,23 @@ public class FileUtil {
   }
 
   public static String readTextFileFromResource(String resource) throws IOException {
-    StringBuilder fileData = new StringBuilder(1000);
-    InputStream inputStream = ClassLoader.getSystemResourceAsStream(resource);
-    byte[] buf = new byte[1024];
-    int numRead;
+    return readTextFromStream(ClassLoader.getSystemResourceAsStream(resource));
+  }
+
+  public static String readTextFromStream(InputStream inputStream)
+      throws IOException {
     try {
+      StringBuilder fileData = new StringBuilder(1000);
+      byte[] buf = new byte[1024];
+      int numRead;
       while ((numRead = inputStream.read(buf)) != -1) {
         String readData = new String(buf, 0, numRead, Charset.defaultCharset());
         fileData.append(readData);
       }
+      return fileData.toString();
     } finally {
-      IOUtils.cleanup(null, inputStream);
+      IOUtils.closeStream(inputStream);
     }
-    return fileData.toString();
   }
 
   public static String readTextFile(File file) throws IOException {
@@ -119,6 +123,15 @@ public class FileUtil {
     return fileData.toString();
   }
 
+  public static void writeTextToStream(String text, OutputStream outputStream)
+      throws IOException {
+    try {
+      outputStream.write(text.getBytes());
+    } finally {
+      IOUtils.closeStream(outputStream);
+    }
+  }
+
   public static String humanReadableByteCount(long bytes, boolean si) {
     int unit = si ? 1000 : 1024;
     if (bytes < unit) return bytes + " B";

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
index f42e114..6fccaad 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
@@ -237,6 +237,7 @@ FOLLOWING : F O L L O W I N G;
 FORMAT : F O R M A T;
 FUSION : F U S I O N;
 
+GLOBAL : G L O B A L;
 GROUPING : G R O U P I N G;
 
 HASH : H A S H;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index a05a060..9ac3f8c 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -39,7 +39,7 @@ sql
   ;
 
 explain_clause
-  : EXPLAIN
+  : EXPLAIN (GLOBAL)?
   ;
 
 statement

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 869c0eb..23c2eec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -78,7 +78,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   public Expr visitSql(SqlContext ctx) {
     Expr statement = visit(ctx.statement());
     if (checkIfExist(ctx.explain_clause())) {
-      return new Explain(statement);
+      return new Explain(statement, checkIfExist(ctx.explain_clause().GLOBAL()));
     } else {
       return statement;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index d2ac6cc..cd35d96 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -91,6 +91,10 @@ public class GlobalPlanner {
     this(conf, workerContext.getCatalog());
   }
 
+  public TajoConf getConf() {
+    return conf;
+  }
+
   public CatalogService getCatalog() {
     return catalog;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
new file mode 100644
index 0000000..c26e12c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Data channels of a global plan can have multiple shuffle keys, and their appearance order is basically not preserved.
+ * However, to test the equivalence of global plans, the appearance order of shuffle keys must be preserved.
+ * This class guarantees the consistency of the order of shuffle keys.
+ */
+public class ExplainGlobalPlanPreprocessorForTest {
+  private static final ExplainPlanPreprocessorForTest.ColumnComparator columnComparator =
+      new ExplainPlanPreprocessorForTest.ColumnComparator();
+
+  /**
+   * For all data channels, sort shuffle keys by their names.
+   *
+   * @param plan master plan
+   */
+  public void prepareTest(MasterPlan plan) {
+    ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
+
+    while (cursor.hasNext()) {
+      ExecutionBlock block = cursor.nextBlock();
+      List<DataChannel> outgoingChannels = plan.getOutgoingChannels(block.getId());
+      if (outgoingChannels != null) {
+        for (DataChannel channel : outgoingChannels) {
+          if (channel.hasShuffleKeys()) {
+            Column[] shuffleKeys = channel.getShuffleKeys();
+            Arrays.sort(shuffleKeys, columnComparator);
+            channel.setShuffleKeys(shuffleKeys);
+          }
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java
new file mode 100644
index 0000000..ab37e22
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainPlanPreprocessorForTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.AlgebraicUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Stack;
+
+/**
+ * Tajo's logical planner can generate different shapes of logical plans for the same query,
+ * especially when the query involves one or more joins.
+ * This class guarantees the consistency of the logical plan for the same query.
+ */
+public class ExplainPlanPreprocessorForTest {
+  private static final PlanShapeFixerContext shapeFixerContext = new PlanShapeFixerContext();
+  private static final PlanShapeFixer shapeFixer = new PlanShapeFixer();
+  private static final PidResetContext resetContext = new PidResetContext();
+  private static final PidReseter pidReseter = new PidReseter();
+
+  public void prepareTest(LogicalPlan plan) throws PlanningException {
+    // Pid reseter
+    resetContext.reset();
+    pidReseter.visit(resetContext, plan, plan.getRootBlock());
+
+    // Plan shape fixer
+    shapeFixerContext.reset();
+    shapeFixer.visit(shapeFixerContext, plan, plan.getRootBlock());
+  }
+
+  private static class PlanShapeFixerContext {
+
+    Stack<Integer> childNumbers = new Stack<Integer>();
+    public void reset() {
+      childNumbers.clear();
+    }
+  }
+
+  /**
+   * Given a commutative join, two children of the join node are interchangeable.
+   * This class fix the logical plan according to the following rules.
+   *
+   * <h3>Rules</h3>
+   * <ul>
+   *   <li>When one of the both children has more descendants,
+   *   change the plan in order that the left child is the one who has more descendants.</li>
+   *   <li>When both children have the same number of descendants,
+   *   their order is decided based on their string representation.</li>
+   * </ul>
+   *
+   * In addition, in/out schemas, quals, and targets are sorted by their names.
+   */
+  private static class PlanShapeFixer extends BasicLogicalPlanVisitor<PlanShapeFixerContext, LogicalNode> {
+    private static final ColumnComparator columnComparator = new ColumnComparator();
+    private static final EvalNodeComparator evalNodeComparator = new EvalNodeComparator();
+    private static final TargetComparator targetComparator = new TargetComparator();
+
+    @Override
+    public LogicalNode visit(PlanShapeFixerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                             LogicalNode node, Stack<LogicalNode> stack) throws PlanningException {
+      super.visit(context, plan, block, node, stack);
+      context.childNumbers.push(context.childNumbers.pop()+1);
+      return null;
+    }
+
+    @Override
+    public LogicalNode visitScan(PlanShapeFixerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 ScanNode node, Stack<LogicalNode> stack) throws PlanningException {
+      super.visitScan(context, plan, block, node, stack);
+      context.childNumbers.push(1);
+      node.setInSchema(sortSchema(node.getInSchema()));
+      if (node.hasQual()) {
+        node.setQual(sortQual(node.getQual()));
+      }
+      return null;
+    }
+
+    @Override
+    public LogicalNode visitJoin(PlanShapeFixerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 JoinNode node, Stack<LogicalNode> stack) throws PlanningException {
+      super.visitJoin(context, plan, block, node, stack);
+      int rightChildNum = context.childNumbers.pop();
+      int leftChildNum = context.childNumbers.pop();
+
+      if (PlannerUtil.isCommutativeJoin(node.getJoinType())) {
+
+        if (leftChildNum < rightChildNum) {
+          swapChildren(node);
+        } else if (leftChildNum == rightChildNum) {
+          if (node.getLeftChild().toString().compareTo(node.getRightChild().toString()) <
+              0) {
+            swapChildren(node);
+          }
+        }
+      }
+
+      node.setInSchema(sortSchema(node.getInSchema()));
+      node.setOutSchema(sortSchema(node.getOutSchema()));
+
+      if (node.hasJoinQual()) {
+        node.setJoinQual(sortQual(node.getJoinQual()));
+      }
+
+      if (node.hasTargets()) {
+        node.setTargets(sortTargets(node.getTargets()));
+      }
+
+      context.childNumbers.push(rightChildNum + leftChildNum);
+
+      return null;
+    }
+
+    private Schema sortSchema(Schema schema) {
+      Column[] columns = schema.toArray();
+      Arrays.sort(columns, columnComparator);
+
+      Schema sorted = new Schema();
+      for (Column col : columns) {
+        sorted.addColumn(col);
+      }
+      return sorted;
+    }
+
+    private EvalNode sortQual(EvalNode qual) {
+      EvalNode[] cnf = AlgebraicUtil.toConjunctiveNormalFormArray(qual);
+      Arrays.sort(cnf, evalNodeComparator);
+      return AlgebraicUtil.createSingletonExprFromCNF(cnf);
+    }
+
+    private Target[] sortTargets(Target[] targets) {
+      Arrays.sort(targets, targetComparator);
+      return targets;
+    }
+
+    private static void swapChildren(JoinNode node) {
+      LogicalNode tmpChild = node.getLeftChild();
+      int tmpId = tmpChild.getPID();
+      tmpChild.setPID(node.getRightChild().getPID());
+      node.getRightChild().setPID(tmpId);
+      node.setLeftChild(node.getRightChild());
+      node.setRightChild(tmpChild);
+    }
+  }
+
+  public static class ColumnComparator implements Comparator<Column> {
+
+    @Override
+    public int compare(Column o1, Column o2) {
+      return o1.getQualifiedName().compareTo(o2.getQualifiedName());
+    }
+  }
+
+  private static class EvalNodeComparator implements Comparator<EvalNode> {
+
+    @Override
+    public int compare(EvalNode o1, EvalNode o2) {
+      return o1.toJson().compareTo(o2.toJson());
+    }
+  }
+
+  private static class TargetComparator implements Comparator<Target> {
+
+    @Override
+    public int compare(Target o1, Target o2) {
+      return o1.toJson().compareTo(o2.toJson());
+    }
+  }
+
+  private static class PidResetContext {
+    int seqId = 0;
+    public void reset() {
+      seqId = 0;
+    }
+  }
+
+  /**
+   * During join order optimization, new join nodes are created based on the chosen join order.
+   * So, each join node has different pids.
+   * This class sequentially assigns unique pids to all logical nodes.
+   */
+  private static class PidReseter extends BasicLogicalPlanVisitor<PidResetContext, LogicalNode> {
+
+    @Override
+    public void preHook(LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack, PidResetContext context)
+        throws PlanningException {
+      node.setPID(context.seqId++);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index aa8b228..75e7762 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -36,6 +36,8 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.physical.EvalExprExec;
 import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
@@ -45,6 +47,7 @@ import org.apache.tajo.master.*;
 import org.apache.tajo.master.exec.prehook.CreateTableHook;
 import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
 import org.apache.tajo.master.exec.prehook.InsertIntoHook;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.querymaster.*;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.LogicalPlan;
@@ -101,7 +104,7 @@ public class QueryExecutor {
 
 
     } else if (plan.isExplain()) { // explain query
-      execExplain(plan, response);
+      execExplain(plan, queryContext, plan.isExplainGlobal(), response);
 
     } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) {
       execQueryOnVirtualTable(queryContext, session, sql, plan, response);
@@ -157,9 +160,28 @@ public class QueryExecutor {
     response.setResultCode(ClientProtos.ResultCode.OK);
   }
 
-  public void execExplain(LogicalPlan plan, SubmitQueryResponse.Builder response) throws IOException {
+  public void execExplain(LogicalPlan plan, QueryContext queryContext, boolean isGlobal,
+                          SubmitQueryResponse.Builder response)
+      throws Exception {
+    String explainStr;
+    boolean isTest = queryContext.getBool(SessionVars.TEST_PLAN_SHAPE_FIX_ENABLED);
+    if (isTest) {
+      ExplainPlanPreprocessorForTest preprocessorForTest = new ExplainPlanPreprocessorForTest();
+      preprocessorForTest.prepareTest(plan);
+    }
+
+    if (isGlobal) {
+      GlobalPlanner planner = new GlobalPlanner(context.getConf(), context.getCatalog());
+      MasterPlan masterPlan = compileMasterPlan(plan, queryContext, planner);
+      if (isTest) {
+        ExplainGlobalPlanPreprocessorForTest globalPlanPreprocessorForTest = new ExplainGlobalPlanPreprocessorForTest();
+        globalPlanPreprocessorForTest.prepareTest(masterPlan);
+      }
+      explainStr = masterPlan.toString();
+    } else {
+      explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
+    }
 
-    String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
     Schema schema = new Schema();
     schema.addColumn("explain", TajoDataTypes.Type.TEXT);
     RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
@@ -416,4 +438,34 @@ public class QueryExecutor {
           " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
     }
   }
+
+  public static MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
+      throws Exception {
+
+    CatalogProtos.StoreType storeType = PlannerUtil.getStoreType(plan);
+    if (storeType != null) {
+      StorageManager sm = StorageManager.getStorageManager(planner.getConf(), storeType);
+      StorageProperty storageProperty = sm.getStorageProperty();
+      if (storageProperty.isSortedInsert()) {
+        String tableName = PlannerUtil.getStoreTableName(plan);
+        LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+        TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
+        if (tableDesc == null) {
+          throw new VerifyException("Can't get table meta data from catalog: " + tableName);
+        }
+        List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+            context, tableDesc);
+        if (storageSpecifiedRewriteRules != null) {
+          for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
+            eachRule.rewrite(context, plan);
+          }
+        }
+      }
+    }
+
+    MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);
+    planner.build(masterPlan);
+
+    return masterPlan;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 0d1924b..f83cb1e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -38,6 +38,7 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 15fbdae..4e104dd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -21,9 +21,12 @@ package org.apache.tajo;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.CatalogService;
@@ -50,10 +53,19 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -181,6 +193,8 @@ public class QueryTestCaseBase {
   protected Path currentResultPath;
   protected Path currentDatasetPath;
 
+  protected FileSystem currentResultFS;
+
   // for getting a method name
   @Rule public TestName name = new TestName();
 
@@ -243,8 +257,10 @@ public class QueryTestCaseBase {
         client.updateQuery("CREATE DATABASE IF NOT EXISTS " + CatalogUtil.denormalizeIdentifier(currentDatabase));
       }
       client.selectDatabase(currentDatabase);
-    } catch (ServiceException e) {
-      e.printStackTrace();
+      currentResultFS = currentResultPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
     testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false");
   }
@@ -317,6 +333,61 @@ public class QueryTestCaseBase {
     return executeFile(getMethodName() + ".sql");
   }
 
+  private volatile Description current;
+
+  @Rule
+  public TestRule watcher = new TestWatcher() {
+    @Override
+    protected void starting(Description description) {
+      QueryTestCaseBase.this.current = description;
+    }
+  };
+
+  @Target({ElementType.METHOD})
+  @Retention(RetentionPolicy.RUNTIME)
+  protected static @interface SimpleTest {
+    String[] queries();
+    String[] cleanupTables() default {};
+  }
+
+  protected void runSimpleTests() throws Exception {
+    String methodName = getMethodName();
+    Method method = current.getTestClass().getMethod(methodName);
+    SimpleTest annotation = method.getAnnotation(SimpleTest.class);
+    if (annotation == null) {
+      throw new IllegalStateException("Cannot find test annotation");
+    }
+    String[] queries = annotation.queries();
+    try {
+      for (int i = 0; i < queries.length; i++) {
+        ResultSet result = client.executeQueryAndGetResult(queries[i]);
+        Path resultPath = StorageUtil.concatPath(
+            currentResultPath, methodName + "." + String.valueOf(i + 1) + ".result");
+        if (currentResultFS.exists(resultPath)) {
+          assertEquals("Result Verification for: " + (i+1) + "th test",
+              FileUtil.readTextFromStream(currentResultFS.open(resultPath)), resultSetToString(result).trim());
+        } else if (!isNull(result)) {
+          // If there is no result file expected, create gold files for new tests.
+          FileUtil.writeTextToStream(resultSetToString(result).trim(), currentResultFS.create(resultPath));
+          LOG.info("New test output for " + current.getDisplayName() + " is written to " + resultPath);
+          // should be copied to src directory
+        }
+      }
+    } finally {
+      for (String tableName : annotation.cleanupTables()) {
+        try {
+          client.dropTable(tableName);
+        } catch (ServiceException e) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  private boolean isNull(ResultSet result) throws SQLException {
+    return result.getMetaData().getColumnCount() == 0;
+  }
+
   protected String getMethodName() {
     String methodName = name.getMethodName();
     // In the case of parameter execution name's pattern is methodName[0]

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index f7b1382..b54d7ea 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -105,6 +105,22 @@ public class TestSelectQuery extends QueryTestCaseBase {
   }
 
   @Test
+  @SimpleTest(queries = {
+      "explain global " +
+          "select l_orderkey, l_partkey from lineitem",
+      "explain global " +
+          "select n1.n_nationkey, n1.n_name, n2.n_name from nation n1 join nation n2 on n1.n_name = upper(n2.n_name) " +
+          "order by n1.n_nationkey;",
+      "explain global " +
+          "select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey) from lineitem " +
+          "group by l_linenumber having sum(distinct l_orderkey) = 6"})
+  public final void testExplainSelectPhysical() throws Exception {
+    // Enable this option to fix the shape of the generated plans.
+    testingCluster.getConfiguration().set(ConfVars.$TEST_PLAN_SHAPE_FIX_ENABLED.varname, "true");
+    runSimpleTests();
+  }
+
+  @Test
   public final void testSelect() throws Exception {
     // select l_orderkey, l_partkey from lineitem;
     ResultSet res = executeQuery();

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.1.result b/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.1.result
new file mode 100644
index 0000000..0069639
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.1.result
@@ -0,0 +1,26 @@
+explain
+-------------------------------
+-------------------------------------------------------------------------------
+Execution Block Graph (TERMINAL - eb_0000000000000_0000_000002)
+-------------------------------------------------------------------------------
+|-eb_0000000000000_0000_000002
+   |-eb_0000000000000_0000_000001
+-------------------------------------------------------------------------------
+Order of Execution
+-------------------------------------------------------------------------------
+1: eb_0000000000000_0000_000001
+2: eb_0000000000000_0000_000002
+-------------------------------------------------------------------------------
+
+=======================================================
+Block Id: eb_0000000000000_0000_000001 [ROOT]
+=======================================================
+
+SCAN(0) on default.lineitem
+  => target list: default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)
+  => out schema: {(2) default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4)}
+  => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000002 [TERMINAL]
+=======================================================
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.2.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.2.result b/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.2.result
new file mode 100644
index 0000000..7946c5b
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.2.result
@@ -0,0 +1,88 @@
+explain
+-------------------------------
+-------------------------------------------------------------------------------
+Execution Block Graph (TERMINAL - eb_0000000000000_0000_000005)
+-------------------------------------------------------------------------------
+|-eb_0000000000000_0000_000005
+   |-eb_0000000000000_0000_000004
+      |-eb_0000000000000_0000_000003
+         |-eb_0000000000000_0000_000002
+         |-eb_0000000000000_0000_000001
+-------------------------------------------------------------------------------
+Order of Execution
+-------------------------------------------------------------------------------
+1: eb_0000000000000_0000_000001
+2: eb_0000000000000_0000_000002
+3: eb_0000000000000_0000_000003
+4: eb_0000000000000_0000_000004
+5: eb_0000000000000_0000_000005
+-------------------------------------------------------------------------------
+
+=======================================================
+Block Id: eb_0000000000000_0000_000001 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 1 => 3 (type=HASH_SHUFFLE, key=?upper_1 (TEXT), num=32)
+
+SCAN(0) on default.nation as n2
+  => target list: default.n2.n_name (TEXT), upper(default.n2.n_name (TEXT)) as ?upper_1
+  => out schema: {(2) default.n2.n_name (TEXT), ?upper_1 (TEXT)}
+  => in schema: {(4) default.n2.n_comment (TEXT), default.n2.n_name (TEXT), default.n2.n_nationkey (INT4), default.n2.n_regionkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000002 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 2 => 3 (type=HASH_SHUFFLE, key=default.n1.n_name (TEXT), num=32)
+
+SCAN(1) on default.nation as n1
+  => target list: default.n1.n_nationkey (INT4), default.n1.n_name (TEXT)
+  => out schema: {(2) default.n1.n_nationkey (INT4), default.n1.n_name (TEXT)}
+  => in schema: {(4) default.n1.n_comment (TEXT), default.n1.n_name (TEXT), default.n1.n_nationkey (INT4), default.n1.n_regionkey (INT4)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000003 [INTERMEDIATE]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 1 => 3 (type=HASH_SHUFFLE, key=?upper_1 (TEXT), num=32)
+[q_0000000000000_0000] 2 => 3 (type=HASH_SHUFFLE, key=default.n1.n_name (TEXT), num=32)
+
+[Outgoing]
+[q_0000000000000_0000] 3 => 4 (type=RANGE_SHUFFLE, key=default.n1.n_nationkey (INT4), num=32)
+
+SORT(10)
+  => Sort Keys: default.n1.n_nationkey (INT4) (asc)
+   JOIN(6)(INNER)
+     => Join Cond: default.n1.n_name (TEXT) = ?upper_1 (TEXT)
+     => target list: default.n1.n_name (TEXT), default.n1.n_nationkey (INT4), default.n2.n_name (TEXT)
+     => out schema: {(3) default.n1.n_name (TEXT), default.n1.n_nationkey (INT4), default.n2.n_name (TEXT)}
+     => in schema: {(4) ?upper_1 (TEXT), default.n1.n_name (TEXT), default.n1.n_nationkey (INT4), default.n2.n_name (TEXT)}
+      SCAN(9) on eb_0000000000000_0000_000002
+        => out schema: {(2) default.n1.n_nationkey (INT4), default.n1.n_name (TEXT)}
+        => in schema: {(2) default.n1.n_nationkey (INT4), default.n1.n_name (TEXT)}
+      SCAN(8) on eb_0000000000000_0000_000001
+        => out schema: {(2) default.n2.n_name (TEXT), ?upper_1 (TEXT)}
+        => in schema: {(2) default.n2.n_name (TEXT), ?upper_1 (TEXT)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000004 [ROOT]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 3 => 4 (type=RANGE_SHUFFLE, key=default.n1.n_nationkey (INT4), num=32)
+
+[Enforcers]
+ 0: sorted input=eb_0000000000000_0000_000003
+
+SORT(3)
+  => Sort Keys: default.n1.n_nationkey (INT4) (asc)
+   SCAN(11) on eb_0000000000000_0000_000003
+     => out schema: {(3) default.n1.n_name (TEXT), default.n1.n_nationkey (INT4), default.n2.n_name (TEXT)}
+     => in schema: {(3) default.n1.n_name (TEXT), default.n1.n_nationkey (INT4), default.n2.n_name (TEXT)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000005 [TERMINAL]
+=======================================================
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.3.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.3.result b/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.3.result
new file mode 100644
index 0000000..c4e8c2c
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testExplainSelectPhysical.3.result
@@ -0,0 +1,89 @@
+explain
+-------------------------------
+-------------------------------------------------------------------------------
+Execution Block Graph (TERMINAL - eb_0000000000000_0000_000004)
+-------------------------------------------------------------------------------
+|-eb_0000000000000_0000_000004
+   |-eb_0000000000000_0000_000003
+      |-eb_0000000000000_0000_000002
+         |-eb_0000000000000_0000_000001
+-------------------------------------------------------------------------------
+Order of Execution
+-------------------------------------------------------------------------------
+1: eb_0000000000000_0000_000001
+2: eb_0000000000000_0000_000002
+3: eb_0000000000000_0000_000003
+4: eb_0000000000000_0000_000004
+-------------------------------------------------------------------------------
+
+=======================================================
+Block Id: eb_0000000000000_0000_000001 [LEAF]
+=======================================================
+
+[Outgoing]
+[q_0000000000000_0000] 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), num=32)
+
+[Enforcers]
+ 0: type=Distinct,alg=hash
+
+DISTINCT_GROUP_BY(9)(l_linenumber)
+  => exprs: (count( distinct default.lineitem.l_orderkey (INT4)),sum( distinct default.lineitem.l_orderkey (INT4)),count())
+  => target list: ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)
+  => out schema:{(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+  => in schema:{(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+  => 	distinct: true, GROUP_BY(10)(l_orderkey), exprs: (count( distinct default.lineitem.l_orderkey (INT4)),sum( distinct default.lineitem.l_orderkey (INT4))), target list:{default.lineitem.l_orderkey (INT4), ?count_1 (INT8), ?sum_2 (INT8)}, out schema:{(3) default.lineitem.l_orderkey (INT4), ?count_1 (INT8), ?sum_2 (INT8)}, in schema:{(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+  => 	distinct: false, GROUP_BY(11)(), exprs: (count()), target list:{?count (INT8)}, out schema:{(1) ?count (INT8)}, in schema:{(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+   SCAN(0) on default.lineitem
+     => target list: default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)
+     => out schema: {(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+     => in schema: {(16) default.lineitem.l_comment (TEXT), default.lineitem.l_commitdate (TEXT), default.lineitem.l_discount (FLOAT8), default.lineitem.l_extendedprice (FLOAT8), default.lineitem.l_linenumber (INT4), default.lineitem.l_linestatus (TEXT), default.lineitem.l_orderkey (INT4), default.lineitem.l_partkey (INT4), default.lineitem.l_quantity (FLOAT8), default.lineitem.l_receiptdate (TEXT), default.lineitem.l_returnflag (TEXT), default.lineitem.l_shipdate (TEXT), default.lineitem.l_shipinstruct (TEXT), default.lineitem.l_shipmode (TEXT), default.lineitem.l_suppkey (INT4), default.lineitem.l_tax (FLOAT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000002 [INTERMEDIATE]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), num=32)
+
+[Outgoing]
+[q_0000000000000_0000] 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber (INT4), num=32)
+
+[Enforcers]
+ 0: type=Distinct,alg=hash
+
+DISTINCT_GROUP_BY(12)(l_linenumber)
+  => exprs: (count( distinct default.lineitem.l_orderkey (INT4)),sum( distinct default.lineitem.l_orderkey (INT4)),count(?count (INT8)))
+  => target list: ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)
+  => out schema:{(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+  => in schema:{(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+  => 	distinct: true, GROUP_BY(13)(l_orderkey), exprs: (count( distinct default.lineitem.l_orderkey (INT4)),sum( distinct default.lineitem.l_orderkey (INT4))), target list:{default.lineitem.l_orderkey (INT4), ?count_1 (INT8), ?sum_2 (INT8)}, out schema:{(3) default.lineitem.l_orderkey (INT4), ?count_1 (INT8), ?sum_2 (INT8)}, in schema:{(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+  => 	distinct: false, GROUP_BY(14)(), exprs: (count(?count (INT8))), target list:{?count (INT8)}, out schema:{(1) ?count (INT8)}, in schema:{(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+   SCAN(18) on eb_0000000000000_0000_000001
+     => out schema: {(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+     => in schema: {(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000003 [ROOT]
+=======================================================
+
+[Incoming]
+[q_0000000000000_0000] 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber (INT4), num=32)
+
+[Enforcers]
+ 0: type=Distinct,alg=sort,keys=default.lineitem.l_orderkey | 
+
+HAVING(2) (?sum_2 (INT8) = 6)
+   DISTINCT_GROUP_BY(15)(l_linenumber)
+     => exprs: (count( distinct default.lineitem.l_orderkey (INT4)),sum( distinct default.lineitem.l_orderkey (INT4)),count(?count (INT8)))
+     => target list: ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)
+     => out schema:{(4) default.lineitem.l_linenumber (INT4), ?count (INT8), ?count_1 (INT8), ?sum_2 (INT8)}
+     => in schema:{(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+     => 	distinct: true, GROUP_BY(16)(l_orderkey), exprs: (count( distinct default.lineitem.l_orderkey (INT4)),sum( distinct default.lineitem.l_orderkey (INT4))), target list:{default.lineitem.l_orderkey (INT4), ?count_1 (INT8), ?sum_2 (INT8)}, out schema:{(3) default.lineitem.l_orderkey (INT4), ?count_1 (INT8), ?sum_2 (INT8)}, in schema:{(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+     => 	distinct: false, GROUP_BY(17)(), exprs: (count(?count (INT8))), target list:{?count (INT8)}, out schema:{(1) ?count (INT8)}, in schema:{(2) default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4)}
+      SCAN(19) on eb_0000000000000_0000_000002
+        => out schema: {(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+        => in schema: {(4) ?distinctseq (INT2), default.lineitem.l_linenumber (INT4), default.lineitem.l_orderkey (INT4), ?count (INT8)}
+
+=======================================================
+Block Id: eb_0000000000000_0000_000004 [TERMINAL]
+=======================================================
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
index 0425f2e..17f79da 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
@@ -68,7 +68,12 @@ public class LogicalPlan {
   /** planning and optimization log */
   private List<String> planingHistory = Lists.newArrayList();
 
-  private boolean isExplain;
+  private static enum ExplainType {
+    NOT_EXPLAIN,
+    EXPLAIN_LOGICAL,
+    EXPLAIN_GLOBAL
+  }
+  private ExplainType explainType = ExplainType.NOT_EXPLAIN;
 
   public LogicalPlan(LogicalPlanner planner) {
   }
@@ -104,12 +109,16 @@ public class LogicalPlan {
     }
   }
 
-  public void setExplain() {
-    isExplain = true;
+  public void setExplain(boolean isGlobal) {
+    explainType = isGlobal ? ExplainType.EXPLAIN_GLOBAL : ExplainType.EXPLAIN_LOGICAL;
   }
 
   public boolean isExplain() {
-    return isExplain;
+    return explainType != ExplainType.NOT_EXPLAIN;
+  }
+
+  public boolean isExplainGlobal() {
+    return explainType == ExplainType.EXPLAIN_GLOBAL;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/3e9a2dd2/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 8395c3d..ff3d6c2 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -200,7 +200,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   public LogicalNode visitExplain(PlanContext ctx, Stack<Expr> stack, Explain expr) throws PlanningException {
-    ctx.plan.setExplain();
+    ctx.plan.setExplain(expr.isGlobal());
     return visit(ctx, stack, expr.getChild());
   }