You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:06 UTC

[8/8] git commit: TAJO-91: Launch QueryMaster on NodeManager per query. (hyoungjunkim via hyunsik)

TAJO-91: Launch QueryMaster on NodeManager per query. (hyoungjunkim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/9d020883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/9d020883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/9d020883

Branch: refs/heads/master
Commit: 9d02088397aca145aa77799e24469a983cc60974
Parents: a269372
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Aug 14 13:37:18 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Aug 14 13:38:51 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/tajo/catalog/CatalogService.java |   4 +-
 .../org/apache/tajo/catalog/FunctionDesc.java   |   2 +-
 .../src/main/proto/CatalogProtos.proto          | 160 ++--
 .../apache/tajo/catalog/TestFunctionDesc.java   |   5 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   6 +-
 .../src/main/java/org/apache/tajo/QueryId.java  |   4 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../main/java/org/apache/tajo/util/Bytes.java   |  14 +-
 .../org/apache/tajo/util/CommonTestingUtil.java |   2 +-
 .../java/org/apache/tajo/util/TajoIdUtils.java  |  40 +-
 tajo-core/tajo-core-backend/pom.xml             |  23 +-
 .../org/apache/tajo/benchmark/BenchmarkSet.java |   9 +
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  33 +-
 .../org/apache/tajo/client/QueryStatus.java     |  15 +-
 .../java/org/apache/tajo/client/TajoClient.java | 162 +++-
 .../apache/tajo/engine/eval/EvalTreeUtil.java   |   6 +-
 .../apache/tajo/engine/planner/FromTable.java   |   4 +
 .../tajo/engine/query/QueryUnitRequestImpl.java |   4 +-
 .../apache/tajo/engine/query/ResultSetImpl.java |  66 +-
 .../ipc/protocolrecords/QueryUnitRequest.java   |   7 +-
 .../org/apache/tajo/master/ClientService.java   | 411 ----------
 .../org/apache/tajo/master/ContainerProxy.java  | 429 ++++++++++
 .../org/apache/tajo/master/GlobalEngine.java    |  85 +-
 .../org/apache/tajo/master/GlobalPlanner.java   |   2 +-
 .../main/java/org/apache/tajo/master/Query.java | 409 ----------
 .../org/apache/tajo/master/QueryMaster.java     | 465 -----------
 .../java/org/apache/tajo/master/QueryUnit.java  | 499 -----------
 .../apache/tajo/master/QueryUnitAttempt.java    | 344 --------
 .../org/apache/tajo/master/Repartitioner.java   | 582 -------------
 .../java/org/apache/tajo/master/SubQuery.java   | 766 -----------------
 .../org/apache/tajo/master/SubQueryState.java   |  28 -
 .../java/org/apache/tajo/master/TajoMaster.java |  64 +-
 .../tajo/master/TajoMasterClientService.java    | 412 ++++++++++
 .../tajo/master/TaskRunnerLauncherImpl.java     | 487 ++---------
 .../apache/tajo/master/TaskRunnerListener.java  | 172 ----
 .../apache/tajo/master/TaskSchedulerImpl.java   |  25 +-
 .../tajo/master/cluster/WorkerListener.java     | 148 ----
 .../master/event/SubQueryCompletedEvent.java    |   2 +-
 .../tajo/master/event/SubQuerySucceeEvent.java  |   2 +-
 .../event/TaskAttemptStatusUpdateEvent.java     |   2 +-
 .../tajo/master/event/TaskCompletionEvent.java  |   2 +-
 .../tajo/master/event/TaskFatalErrorEvent.java  |   2 +-
 .../tajo/master/event/TaskRequestEvent.java     |   2 +-
 .../apache/tajo/master/querymaster/Query.java   | 413 ++++++++++
 .../tajo/master/querymaster/QueryMaster.java    | 817 +++++++++++++++++++
 .../querymaster/QueryMasterClientService.java   | 196 +++++
 .../master/querymaster/QueryMasterManager.java  | 353 ++++++++
 .../querymaster/QueryMasterManagerService.java  | 114 +++
 .../master/querymaster/QueryMasterRunner.java   | 152 ++++
 .../tajo/master/querymaster/QueryUnit.java      | 500 ++++++++++++
 .../master/querymaster/QueryUnitAttempt.java    | 344 ++++++++
 .../tajo/master/querymaster/Repartitioner.java  | 584 +++++++++++++
 .../tajo/master/querymaster/SubQuery.java       | 766 +++++++++++++++++
 .../tajo/master/querymaster/SubQueryState.java  |  28 +
 .../tajo/master/rm/RMContainerAllocator.java    |  31 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   4 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  61 +-
 .../src/main/proto/CatalogProtos.proto          | 160 ++--
 .../src/main/proto/ClientProtocol.proto         |   7 +-
 .../src/main/proto/ClientProtos.proto           | 136 +++
 .../src/main/proto/MasterWorkerProtocol.proto   |  36 -
 .../src/main/proto/MasterWorkerProtos.proto     | 118 ---
 .../main/proto/QueryMasterClientProtocol.proto  |  36 +
 .../main/proto/QueryMasterManagerProtocol.proto |  50 ++
 .../src/main/proto/QueryMasterProtocol.proto    | 132 +++
 .../main/proto/TajoMasterClientProtocol.proto   |  64 ++
 .../src/main/proto/tajo_protos.proto            |  17 +-
 .../src/main/resources/tajo-default.xml         |  18 +
 .../org/apache/tajo/BackendTestingUtil.java     |   2 +-
 .../apache/tajo/LocalTajoTestingUtility.java    |  12 +
 .../org/apache/tajo/MiniTajoYarnCluster.java    |   5 +
 .../org/apache/tajo/TajoTestingCluster.java     |  63 +-
 .../test/java/org/apache/tajo/TpchTestBase.java |   5 +-
 .../org/apache/tajo/benchmark/TestTPCH.java     |  45 +-
 .../org/apache/tajo/client/TestTajoClient.java  |   4 +-
 .../engine/function/TestBuiltinFunctions.java   |  58 +-
 .../plan/global/TestGlobalQueryPlanner.java     |   2 +-
 .../global/TestGlobalQueryOptimizer.java        |  10 +-
 .../tajo/engine/query/TestGroupByQuery.java     |  43 +-
 .../apache/tajo/engine/query/TestJoinQuery.java | 134 +--
 .../tajo/engine/query/TestNullValues.java       |  34 +-
 .../tajo/engine/query/TestResultSetImpl.java    |   2 +-
 .../tajo/engine/query/TestSelectQuery.java      | 339 ++++----
 .../apache/tajo/engine/query/TestSortQuery.java | 148 ++--
 .../tajo/master/TestExecutionBlockCursor.java   |   2 +-
 .../apache/tajo/master/TestRepartitioner.java   |  10 +-
 .../org/apache/tajo/worker/TaskRunnerTest.java  |   6 +-
 .../src/test/resources/log4j.properties         |   5 +-
 tajo-core/tajo-core-pullserver/pom.xml          |   3 +-
 .../src/main/proto/CatalogProtos.proto          | 160 ++--
 tajo-project/pom.xml                            |   9 +-
 tajo-rpc/pom.xml                                |   3 +-
 .../org/apache/tajo/rpc/NettyServerBase.java    |  52 +-
 .../apache/tajo/rpc/ProtoBlockingRpcServer.java |   4 -
 .../org/apache/tajo/rpc/TestNettyAsyncRpc.java  | 115 ---
 .../apache/tajo/rpc/TestNettyBlockingRpc.java   | 110 ---
 .../apache/tajo/rpc/TestProtoBlockingRpc.java   |  65 +-
 .../test/impl/DummyProtocolBlockingImpl.java    |   2 +-
 99 files changed, 6969 insertions(+), 5530 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 808d9cb..0e84b29 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-91: Launch QueryMaster on NodeManager per query. 
+    (hyoungjunkim via hyunsik)
+
     TAJO-100: Port the parse error handling to the new parser. (jinho)
 
     TAJO-121: Add LogicalPlanVisitor and Refactor LogicalOptimizer to use the

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
index c4eb535..d0b5f50 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -28,7 +28,7 @@ public interface CatalogService {
    * Get a table description by name
    * @param name table name
    * @return a table description
-   * @see TableDescImpl
+   * @see org.apache.tajo.catalog.TableDescImpl
    * @throws Throwable
    */
   TableDesc getTableDesc(String name);
@@ -49,7 +49,7 @@ public interface CatalogService {
 
   /**
    * Add a table via table description
-   * @see TableDescImpl
+   * @see org.apache.tajo.catalog.TableDescImpl
    * @throws Throwable
    */
   boolean addTable(TableDesc desc);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
index 8f678b9..73f861a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
@@ -67,7 +67,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
   /**
    * 
    * @return 함수 인스턴스
-   * @throws InternalException
+   * @throws org.apache.tajo.exception.InternalException
    */
   public Function newInstance() throws InternalException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index c171c2b..6164553 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -25,9 +25,9 @@ option java_generate_equals_and_hash = true;
 import "DataTypes.proto";
 
 enum StoreType {
-  MEM = 0;
-  CSV = 1;
-  RAW = 2;
+	MEM = 0;
+	CSV = 1;
+	RAW = 2;
   RCFILE = 3;
   ROWFILE = 4;
   HCFILE = 5;
@@ -35,147 +35,147 @@ enum StoreType {
 }
 
 enum OrderType {
-  ORDER_NONE = 0;
-  ASC = 1;
-  DSC = 2;
+    ORDER_NONE = 0;
+    ASC = 1;
+    DSC = 2;
 }
 
 enum CompressType {
-  COMP_NONE = 0;
-  NULL_SUPPRESS = 1;
-  RUN_LENGTH = 2;
-  BIT_VECTOR = 3;
-  DICTIONARY = 4;
-  SNAPPY = 5;
-  LZ = 6;
+    COMP_NONE = 0;
+    NULL_SUPPRESS = 1;
+    RUN_LENGTH = 2;
+    BIT_VECTOR = 3;
+    DICTIONARY = 4;
+    SNAPPY = 5;
+    LZ = 6;
 }
 
 message ColumnMetaProto {
-  required DataType dataType = 1;
-  required bool compressed = 2;
-  required bool sorted = 3;
-  required bool contiguous = 4;
-  required StoreType storeType = 5;
-  required CompressType compType = 6;
-  required int64 startRid = 7;
-  required int32 recordNum = 8;
-  required int32 offsetToIndex = 9;
+    required DataType dataType = 1;
+    required bool compressed = 2;
+    required bool sorted = 3;
+    required bool contiguous = 4;
+    required StoreType storeType = 5;
+    required CompressType compType = 6;
+    required int64 startRid = 7;
+    required int32 recordNum = 8;
+    required int32 offsetToIndex = 9;
 }
 
 message ColumnProto {
-  required string columnName = 1;
-  required DataType dataType = 2;
+	required string columnName = 1;
+	required DataType dataType = 2;
 }
 
 message SchemaProto {
-  repeated ColumnProto fields = 1;
+	repeated ColumnProto fields = 1;
 }
 
 message KeyValueProto {
-  required string key = 1;
-  required string value = 2;
+	required string key = 1;
+	required string value = 2;
 }
 
 message KeyValueSetProto {
-  repeated KeyValueProto keyval = 1;
+	repeated KeyValueProto keyval = 1;
 }
 
 message FragmentProto {
-  required string id = 1;
-  required string path = 2;
-  required int64 startOffset = 3;
-  required int64 length = 4;
-  required TableProto meta = 5;
-  optional TableStatProto stat = 6;
+	required string id = 1;
+	required string path = 2;
+	required int64 startOffset = 3;
+	required int64 length = 4;
+	required TableProto meta = 5;
+	optional TableStatProto stat = 6;
   optional bool distCached = 7 [default = false];
 }
 
 message TableProto {
-  required SchemaProto schema = 1;
-  required StoreType storeType = 2;
-  required KeyValueSetProto params = 3;
-  optional TableStatProto stat = 4;
+    required SchemaProto schema = 1;
+    required StoreType storeType = 2;
+    required KeyValueSetProto params = 3;
+    optional TableStatProto stat = 4;
 }
 
 message TableDescProto {
-  required string id = 1;
-  required string path = 2;
-  required TableProto meta = 3;
+	required string id = 1;
+	required string path = 2;
+	required TableProto meta = 3;
 }
 
 enum FunctionType {
-  GENERAL = 0;
-  AGGREGATION = 1;
+	GENERAL = 0;
+	AGGREGATION = 1;
 }
 
 message FunctionDescProto {
-  required string signature = 1;
-  required string className = 2;
-  required FunctionType type = 3;
-  repeated DataType parameterTypes = 4;
-  required DataType returnType = 5;
+	required string signature = 1;
+	required string className = 2;
+	required FunctionType type = 3;
+	repeated DataType parameterTypes = 4;
+	required DataType returnType = 5;
 }
 
 message IndexDescProto {
-  required string name = 1;
-  required string tableId = 2;
-  required ColumnProto column = 3;
-  required IndexMethod indexMethod = 4;
-  optional bool isUnique = 5 [default = false];
-  optional bool isClustered = 6 [default = false];
-  optional bool isAscending = 7 [default = false];
+    required string name = 1;
+    required string tableId = 2;
+    required ColumnProto column = 3;
+    required IndexMethod indexMethod = 4;
+    optional bool isUnique = 5 [default = false];
+    optional bool isClustered = 6 [default = false];
+    optional bool isAscending = 7 [default = false];
 }
 
 enum IndexMethod {
-  TWO_LEVEL_BIN_TREE = 0;
-  BTREE = 1;
-  HASH = 2;
-  BITMAP = 3;
+    TWO_LEVEL_BIN_TREE = 0;
+    BTREE = 1;
+    HASH = 2;
+    BITMAP = 3;
 }
 
 message GetAllTableNamesResponse {
-  repeated string tableName = 1;
+    repeated string tableName = 1;
 }
 
 message GetIndexRequest {
-  required string tableName = 1;
-  required string columnName = 2;
+    required string tableName = 1;
+    required string columnName = 2;
 }
 
 message GetFunctionsResponse {
-  repeated FunctionDescProto functionDesc = 1;
+	repeated FunctionDescProto functionDesc = 1;
 }
 
 message UnregisterFunctionRequest {
-  required string signature = 1;
-  repeated DataType parameterTypes = 2;
+	required string signature = 1;
+	repeated DataType parameterTypes = 2;
 }
 
 message GetFunctionMetaRequest {
-  required string signature = 1;
-  repeated DataType parameterTypes = 2;
+	required string signature = 1;
+	repeated DataType parameterTypes = 2;
 }
 
 message ContainFunctionRequest {
-  required string signature = 1;
-  repeated DataType parameterTypes = 2;
+	required string signature = 1;
+	repeated DataType parameterTypes = 2;
 }
 
 message TableStatProto {
-  required int64 numRows = 1;
-  required int64 numBytes = 2;
-  optional int32 numBlocks = 3;
-  optional int32 numPartitions = 4;
-  optional int64 avgRows = 5;
-  repeated ColumnStatProto colStat = 6;
+	required int64 numRows = 1;
+	required int64 numBytes = 2;
+	optional int32 numBlocks = 3;
+	optional int32 numPartitions = 4;
+	optional int64 avgRows = 5;
+	repeated ColumnStatProto colStat = 6;
 }
 
 message ColumnStatProto {
-  required ColumnProto column = 1;
-  optional int64 numDistVal = 2;
-  optional int64 numNulls = 3;
-  optional bytes minValue = 4;
-  optional bytes maxValue = 5;
+    required ColumnProto column = 1;
+    optional int64 numDistVal = 2;
+    optional int64 numNulls = 3;
+    optional bytes minValue = 4;
+    optional bytes maxValue = 5;
 }
 
 enum StatType {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
index 936e8c1..1ede050 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestFunctionDesc.java
@@ -22,7 +22,6 @@ import org.apache.tajo.catalog.function.GeneralFunction;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
@@ -45,8 +44,8 @@ public class TestFunctionDesc {
     private Integer y;
 
     public TestSum() {
-      super(new Column[] { new Column("arg1", TajoDataTypes.Type.INT4),
-          new Column("arg2", TajoDataTypes.Type.INT4) });
+      super(new Column[] { new Column("arg1", org.apache.tajo.common.TajoDataTypes.Type.INT4),
+          new Column("arg2", org.apache.tajo.common.TajoDataTypes.Type.INT4) });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index c3d0d4a..e82afc8 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -134,7 +134,7 @@ public class CatalogServer extends AbstractService {
       this.rpcServer.start();
 
       this.bindAddress = this.rpcServer.getBindAddress();
-      this.serverName = org.apache.tajo.util.NetUtils.getIpPortString(bindAddress);
+      this.serverName = NetUtils.getIpPortString(bindAddress);
       conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
     } catch (Exception e) {
       LOG.error("Cannot start RPC Server of CatalogServer", e);
@@ -145,7 +145,9 @@ public class CatalogServer extends AbstractService {
   }
 
   public void stop() {
-    this.rpcServer.shutdown();
+    if (rpcServer != null) {
+      this.rpcServer.shutdown();
+    }
     LOG.info("Catalog Server (" + serverName + ") shutdown");
     super.stop();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/QueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
index 4394d74..5dbbaca 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
@@ -18,7 +18,9 @@
 
 package org.apache.tajo;
 
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProtoOrBuilder;
@@ -112,7 +114,7 @@ public class QueryId implements Comparable<QueryId> {
     return new ApplicationIdPBImpl(p);
   }
 
-  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+  public static ApplicationIdProto convertToProtoFormat(ApplicationId t) {
     return ((ApplicationIdPBImpl)t).getProto();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 5ad16a2..fb7c268 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -66,6 +66,7 @@ public class TajoConf extends YarnConfiguration {
     // Service Addresses
     TASKRUNNER_LISTENER_ADDRESS("tajo.master.taskrunnerlistener.addr", "0.0.0.0:0"), // used internally
     CLIENT_SERVICE_ADDRESS("tajo.master.clientservice.addr", "127.0.0.1:9004"),
+    QUERY_MASTER_MANAGER_SERVICE_ADDRESS("tajo.master.querymastermanager.addr", "127.0.0.1:9005"),
 
     //////////////////////////////////
     // Catalog Configuration

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
index 64ba1b8..33dd9d9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
@@ -133,7 +133,7 @@ public class Bytes {
    * Read byte-array written with a WritableableUtils.vint prefix.
    * @param in Input to read from.
    * @return byte array read off <code>in</code>
-   * @throws IOException e
+   * @throws java.io.IOException e
    */
   public static byte [] readByteArray(final DataInput in)
   throws IOException {
@@ -164,7 +164,7 @@ public class Bytes {
    * Write byte-array with a WritableableUtils.vint prefix.
    * @param out output stream to be written to
    * @param b array to write
-   * @throws IOException e
+   * @throws java.io.IOException e
    */
   public static void writeByteArray(final DataOutput out, final byte [] b)
   throws IOException {
@@ -181,7 +181,7 @@ public class Bytes {
    * @param b array
    * @param offset offset into array
    * @param length length past offset
-   * @throws IOException e
+   * @throws java.io.IOException e
    */
   public static void writeByteArray(final DataOutput out, final byte [] b,
       final int offset, final int length)
@@ -975,7 +975,7 @@ public class Bytes {
  
   interface Comparer<T> {
     abstract public int compareTo(T buffer1, int offset1, int length1,
-        T buffer2, int offset2, int length2);
+                                  T buffer2, int offset2, int length2);
   }
 
   @VisibleForTesting
@@ -985,7 +985,7 @@ public class Bytes {
 
   /**
    * Provides a lexicographical comparer implementation; either a Java
-   * implementation or a faster implementation based on {@link Unsafe}.
+   * implementation or a faster implementation based on {@link sun.misc.Unsafe}.
    *
    * <p>Uses reflection to gracefully fall back to the Java implementation if
    * {@code Unsafe} isn't available.
@@ -1224,7 +1224,7 @@ public class Bytes {
 
   /**
    * @param b bytes to hash
-   * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
+   * @return Runs {@link org.apache.hadoop.io.WritableComparator#hashBytes(byte[], int)} on the
    * passed in array.  This method is what {@link org.apache.hadoop.io.Text} and
    * {@link ImmutableBytesWritable} use calculating hash code.
    */
@@ -1235,7 +1235,7 @@ public class Bytes {
   /**
    * @param b value
    * @param length length of the value
-   * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
+   * @return Runs {@link org.apache.hadoop.io.WritableComparator#hashBytes(byte[], int)} on the
    * passed in array.  This method is what {@link org.apache.hadoop.io.Text} and
    * {@link ImmutableBytesWritable} use calculating hash code.
    */

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
index 7ff1c1f..f045197 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
@@ -30,7 +30,7 @@ public class CommonTestingUtil {
    *
    * @param dir a local directory to be created
    * @return  the created path
-   * @throws IOException
+   * @throws java.io.IOException
    */
   public static Path getTestDir(String dir) throws IOException {
     Path path = new Path(dir);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
index 2164d27..9dfbfbc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
@@ -21,11 +21,20 @@ package org.apache.tajo.util;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.SubQueryId;
 import org.apache.tajo.TajoIdProtos.SubQueryIdProto;
 
+import java.util.Iterator;
+
+import static org.apache.hadoop.yarn.util.StringHelper._split;
+
 public class TajoIdUtils {
+  public static final String YARN_APPLICATION_PREFIX = "application";
+  public static final String YARN_CONTAINER_PREFIX = "container";
+  public static final String YARN_APPLICATION_ATTEMPT_PREFIX = "appattempt";
+
   /** It is mainly for DDL statements which don's have any query id. */
   public static final QueryId NullQueryId =
       TajoIdUtils.createQueryId(BuilderUtils.newApplicationId(0, 0), 0);
@@ -45,7 +54,7 @@ public class TajoIdUtils {
     String[] split = queryId.split(QueryId.SEPARATOR);
     ApplicationId appId = BuilderUtils.newApplicationId(Long.valueOf(split[1]),
         Integer.parseInt(split[2]));
-    int idInt = Integer.parseInt(split[2]);
+    int idInt = Integer.parseInt(split[3]);
     return newQueryId(appId, idInt);
   }
 
@@ -80,4 +89,33 @@ public class TajoIdUtils {
     SubQueryId subId = new SubQueryId(proto);
     return subId;
   }
+
+  public static ApplicationAttemptId toApplicationAttemptId(
+          String applicationAttmeptIdStr) {
+    //This methood from YARN.ConvertUtils
+    Iterator<String> it = _split(applicationAttmeptIdStr).iterator();
+    if (!it.next().equals(YARN_APPLICATION_ATTEMPT_PREFIX)) {
+      throw new IllegalArgumentException("Invalid AppAttemptId prefix: "
+              + applicationAttmeptIdStr);
+    }
+    try {
+      return toApplicationAttemptId(it);
+    } catch (NumberFormatException n) {
+      throw new IllegalArgumentException("Invalid AppAttemptId: "
+              + applicationAttmeptIdStr, n);
+    }
+  }
+
+  private static ApplicationAttemptId toApplicationAttemptId(
+          Iterator<String> it) throws NumberFormatException {
+    //This methood from YARN.ConvertUtils
+    ApplicationId appId = Records.newRecord(ApplicationId.class);
+    appId.setClusterTimestamp(Long.parseLong(it.next()));
+    appId.setId(Integer.parseInt(it.next()));
+    ApplicationAttemptId appAttemptId = Records
+            .newRecord(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(Integer.parseInt(it.next()));
+    return appAttemptId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index 99bb666..541b4d4 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -173,9 +173,11 @@
                 <argument>-Isrc/main/proto/</argument>
                 <argument>--java_out=target/generated-sources/proto</argument>
                 <argument>src/main/proto/tajo_protos.proto</argument>
-                <argument>src/main/proto/MasterWorkerProtos.proto</argument>
-                <argument>src/main/proto/MasterWorkerProtocol.proto</argument>
-                <argument>src/main/proto/ClientProtocol.proto</argument>
+                <argument>src/main/proto/ClientProtos.proto</argument>
+                <argument>src/main/proto/QueryMasterClientProtocol.proto</argument>
+                <argument>src/main/proto/QueryMasterManagerProtocol.proto</argument>
+                <argument>src/main/proto/QueryMasterProtocol.proto</argument>
+                <argument>src/main/proto/TajoMasterClientProtocol.proto</argument>
               </arguments>
             </configuration>
             <goals>
@@ -365,10 +367,17 @@
       <artifactId>commons-lang</artifactId>
       <version>2.6</version>
     </dependency>
+    <!--
     <dependency>
       <groupId>org.jboss.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>
+    -->
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
@@ -407,6 +416,14 @@
               </execution>
             </executions>
           </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-report-plugin</artifactId>
+            <version>2.15</version>
+            <configuration>
+              <aggregate>true</aggregate>
+            </configuration>
+          </plugin>
         </plugins>
       </build>
     </profile>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index 42717d0..1664b1c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -52,8 +52,17 @@ public abstract class BenchmarkSet {
   }
 
   protected void loadQueries(String dir) throws IOException {
+    // TODO - this code dead??
     File queryDir = new File(dir);
 
+    if(!queryDir.exists()) {
+      queryDir = new File(System.getProperty("user.dir") + "/tajo-core/tajo-core-backend/" + dir);
+    }
+
+    if(!queryDir.exists())
+    {
+      return;
+    }
     int last;
     String name, query;
     for (String file : queryDir.list()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index ce4c72f..86d47f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -28,12 +28,11 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.client.ClientProtocol;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.master.cluster.ServerName;
+import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TajoIdUtils;
 
@@ -265,7 +264,6 @@ public class TajoCli {
   }
 
   public int executeStatements(String line) throws Exception {
-
     String stripped;
     for (String statement : line.split(";")) {
       stripped = StringUtils.chomp(statement);
@@ -284,14 +282,20 @@ public class TajoCli {
         invokeCommand(cmds);
 
       } else { // submit a query to TajoMaster
-        ClientProtocol.SubmitQueryRespose response = client.executeQuery(stripped);
-
-        if (response.getResultCode() == ClientProtocol.ResultCode.OK) {
-          QueryId queryId = new QueryId(response.getQueryId());
-          if (queryId.equals(TajoIdUtils.NullQueryId)) {
-            sout.println("OK");
-          } else {
-            getQueryResult(queryId);
+        ClientProtos.SubmitQueryResponse response = client.executeQuery(stripped);
+        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+          QueryId queryId = null;
+          try {
+            queryId = new QueryId(response.getQueryId());
+            if (queryId.equals(TajoIdUtils.NullQueryId)) {
+              sout.println("OK");
+            } else {
+              getQueryResult(queryId);
+            }
+          } finally {
+            if(queryId != null) {
+              client.closeQuery(queryId);
+            }
           }
         } else {
         if (response.hasErrorMessage()) {
@@ -318,8 +322,13 @@ public class TajoCli {
 
       QueryStatus status;
       while (true) {
+        // TODO - configurable
         Thread.sleep(1000);
         status = client.getQueryStatus(queryId);
+        if(status.getState() == QueryState.QUERY_MASTER_INIT || status.getState() == QueryState.QUERY_MASTER_LAUNCHED) {
+          continue;
+        }
+
         if (status.getState() == QueryState.QUERY_RUNNING ||
             status.getState() == QueryState.QUERY_SUCCEEDED) {
           sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
@@ -328,7 +337,7 @@ public class TajoCli {
           sout.flush();
         }
 
-        if (status.getState() != QueryState.QUERY_RUNNING) {
+        if (status.getState() != QueryState.QUERY_RUNNING && status.getState() != QueryState.QUERY_NOT_ASSIGNED) {
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
index cdde9de..c7122b3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/QueryStatus.java
@@ -20,7 +20,7 @@ package org.apache.tajo.client;
 
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.client.ClientProtocol.GetQueryStatusResponse;
+import org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
 
 public class QueryStatus {
   private QueryId queryId;
@@ -31,6 +31,8 @@ public class QueryStatus {
   private long finishTime;
   private boolean hasResult;
   private String errorText;
+  private String queryMasterHost;
+  private int queryMasterPort;
 
   public QueryStatus(GetQueryStatusResponse proto) {
     queryId = new QueryId(proto.getQueryId());
@@ -43,6 +45,17 @@ public class QueryStatus {
     if (proto.hasErrorMessage()) {
       errorText = proto.getErrorMessage();
     }
+
+    queryMasterHost = proto.getQueryMasterHost();
+    queryMasterPort = proto.getQueryMasterPort();
+  }
+
+  public String getQueryMasterHost() {
+    return queryMasterHost;
+  }
+
+  public int getQueryMasterPort() {
+    return queryMasterPort;
   }
 
   public QueryId getQueryId() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 8317065..5b4b064 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -28,10 +28,15 @@ import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.client.ClientProtocol.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.ResultSetImpl;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.ipc.TajoMasterClientProtocol.*;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.ipc.QueryMasterClientProtocol.*;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.rpc.ProtoBlockingRpcClient;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.TajoIdUtils;
@@ -39,14 +44,25 @@ import org.apache.tajo.util.TajoIdUtils;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.sql.ResultSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+/**
+ * TajoClient is ThreadSafe
+ */
 public class TajoClient {
   private final Log LOG = LogFactory.getLog(TajoClient.class);
 
   private final TajoConf conf;
-  private ProtoBlockingRpcClient client;
-  private ClientProtocolService.BlockingInterface service;
+  private ProtoBlockingRpcClient tasjoMasterClient;
+  private TajoMasterClientProtocolService.BlockingInterface tajoMasterService;
+
+  private Map<QueryId, QueryMasterClientProtocolService.BlockingInterface> queryMasterConnectionMap =
+          new HashMap<QueryId, QueryMasterClientProtocolService.BlockingInterface>();
+
+  private Map<QueryId, ProtoBlockingRpcClient> queryMasterClientMap =
+          new HashMap<QueryId, ProtoBlockingRpcClient>();
 
   public TajoClient(TajoConf conf) throws IOException {
     this.conf = conf;
@@ -67,8 +83,8 @@ public class TajoClient {
 
   private void connect(InetSocketAddress addr) throws IOException {
     try {
-      client = new ProtoBlockingRpcClient(ClientProtocol.class, addr);
-      service = client.getStub();
+      tasjoMasterClient = new ProtoBlockingRpcClient(TajoMasterClientProtocol.class, addr);
+      tajoMasterService = tasjoMasterClient.getStub();
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -78,11 +94,31 @@ public class TajoClient {
   }
 
   public void close() {
-    client.close();
+    tasjoMasterClient.close();
+
+    for(ProtoBlockingRpcClient eachClient: queryMasterClientMap.values()) {
+      eachClient.close();
+    }
+    queryMasterClientMap.clear();
+    queryMasterConnectionMap.clear();
+  }
+
+  public void closeQuery(QueryId queryId) {
+    if(queryMasterClientMap.containsKey(queryId)) {
+      try {
+        queryMasterConnectionMap.get(queryId).killQuery(null, queryId.getProto());
+      } catch (Exception e) {
+        LOG.warn("Fail to close query:" + queryId + "," + e.getMessage(), e);
+      }
+      queryMasterClientMap.get(queryId).close();
+      LOG.info("Closed QueryMaster connection(" + queryId + "," + queryMasterClientMap.get(queryId).getRemoteAddress() + ")");
+      queryMasterClientMap.remove(queryId);
+      queryMasterConnectionMap.remove(queryId);
+    }
   }
 
   public boolean isConnected() {
-    return client.isConnected();
+    return tasjoMasterClient.isConnected();
   }
 
   /**
@@ -91,11 +127,11 @@ public class TajoClient {
    * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}
    * or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
    */
-  public SubmitQueryRespose executeQuery(String tql) throws ServiceException {
+  public ClientProtos.SubmitQueryResponse executeQuery(String tql) throws ServiceException {
     QueryRequest.Builder builder = QueryRequest.newBuilder();
     builder.setQuery(tql);
 
-    return service.submitQuery(null, builder.build());
+    return tajoMasterService.submitQuery(null, builder.build());
   }
 
   /**
@@ -110,15 +146,12 @@ public class TajoClient {
       throws ServiceException, IOException {
     QueryRequest.Builder builder = QueryRequest.newBuilder();
     builder.setQuery(sql);
-    SubmitQueryRespose response = service.submitQuery(null, builder.build());
-    if (response.getResultCode() == ResultCode.OK) {
-      QueryId queryId = new QueryId(response.getQueryId());
-      if (queryId.equals(TajoIdUtils.NullQueryId)) {
-        return null;
-      }
-      return getQueryResultAndWait(queryId);
+    SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
+    QueryId queryId = new QueryId(response.getQueryId());
+    if (queryId.equals(TajoIdUtils.NullQueryId)) {
+      return this.createNullResultSet(queryId);
     } else {
-      throw new ServiceException(response.getErrorMessage());
+      return this.getQueryResultAndWait(queryId);
     }
   }
 
@@ -127,38 +160,69 @@ public class TajoClient {
         = GetQueryStatusRequest.newBuilder();
     builder.setQueryId(queryId.getProto());
 
-    GetQueryStatusResponse res = service.getQueryStatus(null,
-        builder.build());
+    GetQueryStatusResponse res = null;
+    if(queryMasterConnectionMap.containsKey(queryId)) {
+      QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
+      res = queryMasterService.getQueryStatus(null, builder.build());
+    } else {
+      res = tajoMasterService.getQueryStatus(null, builder.build());
 
+      String queryMasterHost = res.getQueryMasterHost();
+      if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
+        LOG.info("=========> connect to querymaster:" + queryMasterHost);
+        connectionToQueryMaster(queryId, queryMasterHost, res.getQueryMasterPort());
+      }
+    }
     return new QueryStatus(res);
   }
 
+  private void connectionToQueryMaster(QueryId queryId, String queryMasterHost, int queryMasterPort) {
+    try {
+      InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterHost, queryMasterPort);
+      ProtoBlockingRpcClient client = new ProtoBlockingRpcClient(QueryMasterClientProtocol.class, addr);
+      QueryMasterClientProtocolService.BlockingInterface service = client.getStub();
+
+      queryMasterConnectionMap.put(queryId, service);
+      queryMasterClientMap.put(queryId, client);
+
+      LOG.debug("connected to Query Master (" +
+              org.apache.tajo.util.NetUtils.getIpPortString(addr) + ")");
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
   private static boolean isQueryRunnning(QueryState state) {
     return state == QueryState.QUERY_NEW ||
         state == QueryState.QUERY_INIT ||
-        state == QueryState.QUERY_RUNNING;
+        state == QueryState.QUERY_RUNNING ||
+        state == QueryState.QUERY_MASTER_LAUNCHED ||
+        state == QueryState.QUERY_MASTER_INIT ||
+        state == QueryState.QUERY_NOT_ASSIGNED;
   }
 
   public ResultSet getQueryResult(QueryId queryId)
       throws ServiceException, IOException {
-    if (queryId.equals(TajoIdUtils.NullQueryId)) {
-      return null;
-    }
+      if (queryId.equals(TajoIdUtils.NullQueryId)) {
+        return createNullResultSet(queryId);
+      }
 
     TableDesc tableDesc = getResultDesc(queryId);
-    return new ResultSetImpl(conf, tableDesc.getPath());
+    return new ResultSetImpl(this, queryId, conf, tableDesc.getPath());
   }
 
   public ResultSet getQueryResultAndWait(QueryId queryId)
       throws ServiceException, IOException {
     if (queryId.equals(TajoIdUtils.NullQueryId)) {
-      return null;
+      return createNullResultSet(queryId);
     }
     QueryStatus status = getQueryStatus(queryId);
 
     while(status != null && isQueryRunnning(status.getState())) {
       try {
-        Thread.sleep(500);
+//        Thread.sleep(500);
+        Thread.sleep(2000);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
@@ -170,24 +234,34 @@ public class TajoClient {
       if (status.hasResult()) {
         return getQueryResult(queryId);
       } else {
-        return null;
+        return createNullResultSet(queryId);
       }
 
     } else {
-      LOG.error(status.getErrorMessage());
+      LOG.warn("=====>Query failed:" + status.getState());
 
-      return null;
+      //TODO throw SQLException(?)
+      return createNullResultSet(queryId);
     }
   }
 
+  public ResultSet createNullResultSet(QueryId queryId) throws IOException {
+    return new ResultSetImpl(this, queryId);
+  }
+
   public TableDesc getResultDesc(QueryId queryId) throws ServiceException {
     if (queryId.equals(TajoIdUtils.NullQueryId)) {
       return null;
     }
 
+    QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
+    if(queryMasterService == null) {
+      LOG.warn("No Connection to QueryMaster for " + queryId);
+      return null;
+    }
     GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
     builder.setQueryId(queryId.getProto());
-    GetQueryResultResponse response = service.getQueryResult(null,
+    GetQueryResultResponse response = queryMasterService.getQueryResult(null,
         builder.build());
 
     return CatalogUtil.newTableDesc(response.getTableDesc());
@@ -198,14 +272,14 @@ public class TajoClient {
     builder.setQuery(tql);
 
     ResultCode resultCode =
-        service.updateQuery(null, builder.build()).getResultCode();
+        tajoMasterService.updateQuery(null, builder.build()).getResultCode();
     return resultCode == ResultCode.OK;
   }
 
   public boolean existTable(String name) throws ServiceException {
     StringProto.Builder builder = StringProto.newBuilder();
     builder.setValue(name);
-    return service.existTable(null, builder.build()).getValue();
+    return tajoMasterService.existTable(null, builder.build()).getValue();
   }
 
   public TableDesc attachTable(String name, String path)
@@ -213,7 +287,7 @@ public class TajoClient {
     AttachTableRequest.Builder builder = AttachTableRequest.newBuilder();
     builder.setName(name);
     builder.setPath(path);
-    TableResponse res = service.attachTable(null, builder.build());
+    TableResponse res = tajoMasterService.attachTable(null, builder.build());
     return CatalogUtil.newTableDesc(res.getTableDesc());
   }
 
@@ -225,7 +299,7 @@ public class TajoClient {
   public boolean detachTable(String name) throws ServiceException {
     StringProto.Builder builder = StringProto.newBuilder();
     builder.setValue(name);
-    return service.detachTable(null, builder.build()).getValue();
+    return tajoMasterService.detachTable(null, builder.build()).getValue();
   }
 
   public TableDesc createTable(String name, Path path, TableMeta meta)
@@ -234,14 +308,14 @@ public class TajoClient {
     builder.setName(name);
     builder.setPath(path.toString());
     builder.setMeta(meta.getProto());
-    TableResponse res = service.createTable(null, builder.build());
+    TableResponse res = tajoMasterService.createTable(null, builder.build());
     return CatalogUtil.newTableDesc(res.getTableDesc());
   }
 
   public boolean dropTable(String name) throws ServiceException {
     StringProto.Builder builder = StringProto.newBuilder();
     builder.setValue(name);
-    return service.dropTable(null, builder.build()).getValue();
+    return tajoMasterService.dropTable(null, builder.build()).getValue();
   }
 
   /**
@@ -250,14 +324,14 @@ public class TajoClient {
    */
   public List<String> getTableList() throws ServiceException {
     GetTableListRequest.Builder builder = GetTableListRequest.newBuilder();
-    GetTableListResponse res = service.getTableList(null, builder.build());
+    GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
     return res.getTablesList();
   }
 
   public TableDesc getTableDesc(String tableName) throws ServiceException {
     GetTableDescRequest.Builder build = GetTableDescRequest.newBuilder();
     build.setTableName(tableName);
-    TableResponse res = service.getTableDesc(null, build.build());
+    TableResponse res = tajoMasterService.getTableDesc(null, build.build());
     if (res == null) {
       return null;
     } else {
@@ -272,7 +346,7 @@ public class TajoClient {
 
     try {
       /* send a kill to the TM */
-      service.killQuery(null, queryId.getProto());
+      tajoMasterService.killQuery(null, queryId.getProto());
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
       while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
@@ -293,4 +367,14 @@ public class TajoClient {
 
     return true;
   }
+
+  public static void main(String[] args) throws Exception {
+    TajoClient client = new TajoClient(new TajoConf());
+
+    client.close();
+
+    synchronized(client) {
+      client.wait();
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index bde2df5..973de99 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -98,7 +98,7 @@ public class EvalTreeUtil {
       found.add(node);
     }
   }
-  
+
   public static Schema getSchemaByTargets(Schema inputSchema, Target [] targets) 
       throws InternalException {
     Schema schema = new Schema();
@@ -282,7 +282,7 @@ public class EvalTreeUtil {
   }
   
   public static class VariableCounter implements EvalNodeVisitor {
-    private final Map<EvalNode.Type, Integer> counter;
+    private final Map<Type, Integer> counter;
     
     public VariableCounter() {
       counter = Maps.newHashMap();
@@ -299,7 +299,7 @@ public class EvalTreeUtil {
       }
     }
     
-    public Map<EvalNode.Type, Integer> getCounter() {
+    public Map<Type, Integer> getCounter() {
       return counter;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
index 04c1bb9..0e3a174 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
@@ -41,6 +41,10 @@ public class FromTable implements Cloneable, GsonObject {
     this.alias = alias;
   }
 
+  public TableDesc getTableDesc() {
+    return this.desc;
+  }
+
   public final String getTableName() {
     return desc.getId();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index bb7caa1..7d430c5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.engine.query;
 
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.Fetch;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProtoOrBuilder;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
 import org.apache.tajo.storage.Fragment;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index db049a0..e0db8e1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -24,9 +24,11 @@ package org.apache.tajo.engine.query;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.TableMetaImpl;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
+import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.exception.UnsupportedException;
@@ -56,27 +58,36 @@ public class ResultSetImpl implements ResultSet {
   private int curRow;
   private long totalRow;
   private boolean wasNull;
+  private TajoClient tajoClient;
+  QueryId queryId;
 
-  public ResultSetImpl(Configuration conf, String path) throws IOException {
-    this(conf, new Path(path));
+  public ResultSetImpl(TajoClient tajoClient, QueryId queryId) throws IOException {
+    this(tajoClient, queryId, null, null);
   }
 
-  public ResultSetImpl(Configuration conf, Path path) throws IOException {
+//  public ResultSetImpl(TajoClient tajoClient, QueryId queryId, Configuration conf, String path) throws IOException {
+//    this(tajoClient, queryId, conf, new Path(path));
+//  }
+
+  public ResultSetImpl(TajoClient tajoClient, QueryId queryId, Configuration conf, Path path) throws IOException {
+    this.tajoClient = tajoClient;
+    this.queryId = queryId;
     this.conf = conf;
-    this.fs = path.getFileSystem(this.conf);
-    // TODO - to be improved. It can be solved to get the query finish status
-    // from master.
-    try {
-      this.meta = getMeta(this.conf, path);
-    } catch (FileNotFoundException fnf) {
-      this.totalRow = 0;
-      init();
-      return;
+    if(path != null) {
+      this.fs = path.getFileSystem(this.conf);
+      // TODO - to be improved. It can be solved to get the query finish status
+      // from master.
+      try {
+        this.meta = getMeta(this.conf, path);
+      } catch (FileNotFoundException fnf) {
+        this.totalRow = 0;
+        init();
+        return;
+      }
+      this.totalRow = meta.getStat() != null ? meta.getStat().getNumRows() : 0;
+      Collection<Fragment> frags = getFragmentsNG(meta, path);
+      scanner = new MergeScanner(conf, meta, frags);
     }
-    this.totalRow = meta.getStat() != null ? meta.getStat().getNumRows() : 0;
-    Collection<Fragment> frags = getFragmentsNG(meta, path);
-    scanner = new MergeScanner(conf, meta, frags);
-    scanner.init();
     init();
   }
 
@@ -194,7 +205,9 @@ public class ResultSetImpl implements ResultSet {
   @Override
   public void beforeFirst() throws SQLException {
     try {
-      scanner.reset();
+      if(scanner != null) {
+        scanner.reset();
+      }
       init();
     } catch (IOException e) {
       e.printStackTrace();
@@ -230,7 +243,17 @@ public class ResultSetImpl implements ResultSet {
   @Override
   public void close() throws SQLException {
     try {
-      this.scanner.close();
+      if(tajoClient != null) {
+        this.tajoClient.closeQuery(queryId);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    try {
+      if(scanner != null) {
+        this.scanner.close();
+      }
+      //TODO clean temp result file
       cur = null;
       curRow = -1;
     } catch (IOException e) {
@@ -1232,6 +1255,9 @@ public class ResultSetImpl implements ResultSet {
    */
   @Override
   public boolean next() throws SQLException {
+    if(scanner == null) {
+      return false;
+    }
     try {
       if (totalRow <= 0)
         return false;
@@ -2220,4 +2246,8 @@ public class ResultSetImpl implements ResultSet {
   private void handleNull(Datum d) {
     wasNull = (d instanceof NullDatum);
   }
+
+  public boolean hasResult() {
+    return scanner != null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
index 880110d..bb4008f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
@@ -23,14 +23,13 @@ package org.apache.tajo.ipc.protocolrecords;
 
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.engine.MasterWorkerProtos.Fetch;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
+import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.storage.Fragment;
 
 import java.net.URI;
 import java.util.List;
 
-public interface QueryUnitRequest extends ProtoObject<QueryUnitRequestProto> {
+public interface QueryUnitRequest extends ProtoObject<QueryMasterProtocol.QueryUnitRequestProto> {
 
 	public QueryUnitAttemptId getId();
 	public List<Fragment> getFragments();
@@ -40,7 +39,7 @@ public interface QueryUnitRequest extends ProtoObject<QueryUnitRequestProto> {
 	public boolean isInterQuery();
 	public void setInterQuery();
 	public void addFetch(String name, URI uri);
-	public List<Fetch> getFetches();
+	public List<QueryMasterProtocol.Fetch> getFetches();
   public boolean shouldDie();
   public void setShouldDie();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
deleted file mode 100644
index c980dcf..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
-import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
-import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.client.ClientProtocol;
-import org.apache.tajo.client.ClientProtocol.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.query.exception.SQLSyntaxError;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.event.QueryEvent;
-import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
-import org.apache.tajo.rpc.RemoteException;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-
-public class ClientService extends AbstractService {
-  private final static Log LOG = LogFactory.getLog(ClientService.class);
-  private final MasterContext context;
-  private final TajoConf conf;
-  private final CatalogService catalog;
-  private final ClientProtocolHandler clientHandler;
-  private ProtoBlockingRpcServer server;
-  private InetSocketAddress bindAddress;
-
-  private final BoolProto BOOL_TRUE =
-      BoolProto.newBuilder().setValue(true).build();
-  private final BoolProto BOOL_FALSE =
-      BoolProto.newBuilder().setValue(false).build();
-
-  public ClientService(MasterContext context) {
-    super(ClientService.class.getName());
-    this.context = context;
-    this.conf = context.getConf();
-    this.catalog = context.getCatalog();
-    this.clientHandler = new ClientProtocolHandler();
-  }
-
-  @Override
-  public void start() {
-
-    // start the rpc server
-    String confClientServiceAddr = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
-    try {
-      server = new ProtoBlockingRpcServer(ClientProtocol.class, clientHandler, initIsa);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    server.start();
-    bindAddress = server.getBindAddress();
-    this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS,
-        org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
-    LOG.info("Instantiated ClientService at " + this.bindAddress);
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    server.shutdown();
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddress;
-  }
-
-  public int getHttpPort() {
-    return 0;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // ClientService
-  /////////////////////////////////////////////////////////////////////////////
-
-  public class ClientProtocolHandler implements ClientProtocolService.BlockingInterface {
-    @Override
-    public BoolProto updateSessionVariables(RpcController controller,
-                                            UpdateSessionVariableRequest request)
-        throws ServiceException {
-      return null;
-    }
-
-    @Override
-    public SubmitQueryRespose submitQuery(RpcController controller,
-                                          QueryRequest request)
-        throws ServiceException {
-
-      QueryId queryId;
-      SubmitQueryRespose.Builder build = SubmitQueryRespose.newBuilder();
-      try {
-        queryId = context.getGlobalEngine().executeQuery(request.getQuery());
-      } catch (SQLSyntaxError e) {
-        build.setResultCode(ResultCode.ERROR);
-        build.setErrorMessage(e.getMessage());
-        return build.build();
-
-      } catch (Exception e) {
-        build.setResultCode(ResultCode.ERROR);
-        String msg = e.getMessage();
-        if (msg == null) {
-          msg = "Internal Error";
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.error(msg, e);
-        } else {
-          LOG.error(msg);
-        }
-        build.setErrorMessage(msg);
-        return build.build();
-      }
-
-      LOG.info("Query " + queryId + " is submitted");
-      build.setResultCode(ResultCode.OK);
-      build.setQueryId(queryId.getProto());
-
-      return build.build();
-    }
-
-    @Override
-    public UpdateQueryResponse updateQuery(RpcController controller,
-                                           QueryRequest request)
-        throws ServiceException {
-
-      UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
-      try {
-        context.getGlobalEngine().updateQuery(request.getQuery());
-        builder.setResultCode(ResultCode.OK);
-        return builder.build();
-      } catch (Exception e) {
-        builder.setResultCode(ResultCode.ERROR);
-        if (e.getMessage() == null) {
-          builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
-        }
-        return builder.build();
-      }
-    }
-
-    @Override
-    public GetQueryResultResponse getQueryResult(RpcController controller,
-                                                 GetQueryResultRequest request)
-        throws ServiceException {
-      QueryId queryId = new QueryId(request.getQueryId());
-      if (queryId.equals(TajoIdUtils.NullQueryId)) {
-
-      }
-      Query query = context.getQuery(queryId).getContext().getQuery();
-
-      GetQueryResultResponse.Builder builder
-          = GetQueryResultResponse.newBuilder();
-      switch (query.getState()) {
-        case QUERY_SUCCEEDED:
-          builder.setTableDesc((TableDescProto) query.getResultDesc().getProto());
-          break;
-        case QUERY_FAILED:
-        case QUERY_ERROR:
-          builder.setErrorMessage("Query " + queryId + " is failed");
-        default:
-          builder.setErrorMessage("Query " + queryId + " is still running");
-      }
-
-      return builder.build();
-    }
-
-    @Override
-    public GetQueryListResponse getQueryList(RpcController controller,
-                                             GetQueryListRequest request)
-        throws ServiceException {
-      return null;
-    }
-
-    @Override
-    public GetQueryStatusResponse getQueryStatus(RpcController controller,
-                                                 GetQueryStatusRequest request)
-        throws ServiceException {
-
-      GetQueryStatusResponse.Builder builder
-          = GetQueryStatusResponse.newBuilder();
-      QueryId queryId = new QueryId(request.getQueryId());
-      builder.setQueryId(request.getQueryId());
-
-      if (queryId.equals(TajoIdUtils.NullQueryId)) {
-        builder.setResultCode(ResultCode.OK);
-        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
-      } else {
-        Query query = context.getQuery(queryId).getContext().getQuery();
-        if (query != null) {
-          builder.setResultCode(ResultCode.OK);
-          builder.setState(query.getState());
-          builder.setProgress(query.getProgress());
-          builder.setSubmitTime(query.getAppSubmitTime());
-          builder.setInitTime(query.getInitializationTime());
-          builder.setHasResult(!query.isCreateTableStmt());
-          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(query.getFinishTime());
-          } else {
-            builder.setFinishTime(System.currentTimeMillis());
-          }
-        } else {
-          builder.setResultCode(ResultCode.ERROR);
-          builder.setErrorMessage("No such query: " + queryId.toString());
-        }
-      }
-
-      return builder.build();
-    }
-
-    @Override
-    public BoolProto killQuery(RpcController controller,
-                               ApplicationAttemptIdProto request)
-        throws ServiceException {
-      QueryId queryId = new QueryId(request);
-      QueryMaster query = context.getQuery(queryId);
-      query.handle(new QueryEvent(queryId, QueryEventType.KILL));
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public GetClusterInfoResponse getClusterInfo(RpcController controller,
-                                                 GetClusterInfoRequest request)
-        throws ServiceException {
-      return null;
-    }
-
-    @Override
-    public BoolProto existTable(RpcController controller,
-                                StringProto tableNameProto)
-        throws ServiceException {
-      String tableName = tableNameProto.getValue();
-      if (catalog.existsTable(tableName)) {
-        return BOOL_TRUE;
-      } else {
-        return BOOL_FALSE;
-      }
-    }
-
-    @Override
-    public GetTableListResponse getTableList(RpcController controller,
-                                             GetTableListRequest request)
-        throws ServiceException {
-      Collection<String> tableNames = catalog.getAllTableNames();
-      GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
-      builder.addAllTables(tableNames);
-      return builder.build();
-    }
-
-    @Override
-    public TableResponse getTableDesc(RpcController controller,
-                                      GetTableDescRequest request)
-        throws ServiceException {
-      String name = request.getTableName();
-      if (catalog.existsTable(name)) {
-        return TableResponse.newBuilder()
-            .setTableDesc((TableDescProto) catalog.getTableDesc(name).getProto())
-            .build();
-      } else {
-        return null;
-      }
-    }
-
-    @Override
-    public TableResponse createTable(RpcController controller, CreateTableRequest request)
-        throws ServiceException {
-      Path path = new Path(request.getPath());
-      TableMeta meta = new TableMetaImpl(request.getMeta());
-      TableDesc desc;
-      try {
-        desc = context.getGlobalEngine().createTable(request.getName(), meta, path);
-      } catch (Exception e) {
-        return TableResponse.newBuilder().setErrorMessage(e.getMessage()).build();
-      }
-
-      return TableResponse.newBuilder().setTableDesc((TableDescProto) desc.getProto()).build();
-    }
-
-    @Override
-    public BoolProto dropTable(RpcController controller,
-                               StringProto tableNameProto)
-        throws ServiceException {
-      context.getGlobalEngine().dropTable(tableNameProto.getValue());
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public TableResponse attachTable(RpcController controller,
-                                     AttachTableRequest request)
-        throws ServiceException {
-
-      TableDesc desc;
-      if (catalog.existsTable(request.getName())) {
-        throw new AlreadyExistsTableException(request.getName());
-      }
-
-      Path path = new Path(request.getPath());
-
-      LOG.info(path.toUri());
-
-      TableMeta meta;
-      try {
-        meta = TableUtil.getTableMeta(conf, path);
-      } catch (IOException e) {
-        throw new RemoteException(e);
-      }
-
-      FileSystem fs;
-
-      // for legacy table structure
-      Path tablePath = new Path(path, "data");
-      try {
-        fs = path.getFileSystem(conf);
-        if (!fs.exists(tablePath)) {
-          tablePath = path;
-        }
-      } catch (IOException e) {
-        LOG.error(e);
-        return null;
-      }
-
-      if (meta.getStat() == null) {
-        long totalSize = 0;
-        try {
-          totalSize = calculateSize(tablePath);
-        } catch (IOException e) {
-          LOG.error("Cannot calculate the size of the relation", e);
-          return null;
-        }
-
-        meta = new TableMetaImpl(meta.getProto());
-        TableStat stat = new TableStat();
-        stat.setNumBytes(totalSize);
-        meta.setStat(stat);
-      }
-
-      desc = new TableDescImpl(request.getName(), meta, path);
-      catalog.addTable(desc);
-      LOG.info("Table " + desc.getId() + " is attached ("
-          + meta.getStat().getNumBytes() + ")");
-
-      return TableResponse.newBuilder().
-          setTableDesc((TableDescProto) desc.getProto())
-          .build();
-    }
-
-    @Override
-    public BoolProto detachTable(RpcController controller,
-                                 StringProto tableNameProto)
-        throws ServiceException {
-      String tableName = tableNameProto.getValue();
-      if (!catalog.existsTable(tableName)) {
-        throw new NoSuchTableException(tableName);
-      }
-
-      catalog.deleteTable(tableName);
-
-      LOG.info("Table " + tableName + " is detached");
-      return BOOL_TRUE;
-    }
-  }
-
-  private long calculateSize(Path path) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    long totalSize = 0;
-    for (FileStatus status : fs.listStatus(path)) {
-      totalSize += status.getLen();
-    }
-
-    return totalSize;
-  }
-}