You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/09 15:34:17 UTC

[1/8] tajo git commit: TAJO-1258: Close() for classes derived from FileAppender should be robust. (Jongyoung Park via jinho)

Repository: tajo
Updated Branches:
  refs/heads/index_support e04c65fdd -> 767b9a4b7


TAJO-1258: Close() for classes derived from FileAppender should be robust. (Jongyoung Park via jinho)

Closes #340


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/809cba37
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/809cba37
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/809cba37

Branch: refs/heads/index_support
Commit: 809cba3758564acd4def17928f95de7b4c913c45
Parents: 1c29c1c
Author: jhkim <jh...@apache.org>
Authored: Fri Jan 9 11:00:27 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Jan 9 11:00:27 2015 +0900

----------------------------------------------------------------------
 CHANGES                                                          | 3 +++
 .../src/main/java/org/apache/tajo/storage/CSVFile.java           | 4 +---
 .../src/main/java/org/apache/tajo/storage/RowFile.java           | 3 ++-
 .../src/main/java/org/apache/tajo/storage/avro/AvroAppender.java | 3 ++-
 .../java/org/apache/tajo/storage/parquet/ParquetAppender.java    | 3 ++-
 .../apache/tajo/storage/sequencefile/SequenceFileAppender.java   | 4 +---
 .../java/org/apache/tajo/storage/text/DelimitedTextFile.java     | 4 +---
 7 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 96b63ea..369dbda 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1258: Close() for classes derived from FileAppender should be robust.
+    (Jongyoung Park via jinho)
+
     TAJO-1288: Refactoring org.apache.tajo.master package. (hyunsik)
 
     TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index 28c263c..d4dde28 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -225,10 +225,8 @@ public class CSVFile {
           deflateFilter.resetState();
           deflateFilter = null;
         }
-
-        os.close();
       } finally {
-        IOUtils.cleanup(LOG, fos);
+        IOUtils.cleanup(LOG, os, fos, outputStream);
         if (compressor != null) {
           CodecPool.returnCompressor(compressor);
           compressor = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 5510cbf..1ff6c4f 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -468,7 +469,7 @@ public class RowFile {
         }
         sync();
         out.flush();
-        out.close();
+        IOUtils.cleanup(LOG, out);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index f617099..da426ea 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableMeta;
@@ -201,7 +202,7 @@ public class AvroAppender extends FileAppender {
    */
   @Override
   public void close() throws IOException {
-    dataFileWriter.close();
+    IOUtils.cleanup(null, dataFileWriter);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index 4557287..415c338 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.parquet;
 
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.storage.StorageConstants;
 import parquet.hadoop.ParquetOutputFormat;
@@ -128,7 +129,7 @@ public class ParquetAppender extends FileAppender {
    */
   @Override
   public void close() throws IOException {
-    writer.close();
+    IOUtils.cleanup(null, writer);
   }
 
   public long getEstimatedOutputSize() throws IOException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index 8b5d677..404352c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -248,7 +248,6 @@ public class SequenceFileAppender extends FileAppender {
   @Override
   public void flush() throws IOException {
     os.flush();
-    writer.close();
   }
 
   @Override
@@ -258,8 +257,7 @@ public class SequenceFileAppender extends FileAppender {
       stats.setNumBytes(getOffset());
     }
 
-    os.close();
-    writer.close();
+    IOUtils.cleanup(LOG, os, writer);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/809cba37/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 15db4c3..ac7c549 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -241,10 +241,8 @@ public class DelimitedTextFile {
           deflateFilter.resetState();
           deflateFilter = null;
         }
-
-        os.close();
       } finally {
-        IOUtils.cleanup(LOG, fos);
+        IOUtils.cleanup(LOG, fos, os, outputStream);
         if (compressor != null) {
           CodecPool.returnCompressor(compressor);
           compressor = null;


[2/8] tajo git commit: TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager.

Posted by ji...@apache.org.
TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager.

Closes #334


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/307c6c9f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/307c6c9f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/307c6c9f

Branch: refs/heads/index_support
Commit: 307c6c9f558ae96ffa2f7351ffa3dc5788ba4dbb
Parents: 809cba3
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 14:47:47 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 14:47:47 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../org/apache/tajo/master/QueryJobManager.java | 59 +++++++------
 .../apache/tajo/master/TajoMasterService.java   |  7 --
 .../master/rm/TajoWorkerResourceManager.java    |  2 +-
 .../tajo/master/rm/WorkerResourceManager.java   |  2 +-
 .../master/scheduler/SimpleFifoScheduler.java   |  3 +-
 .../tajo/querymaster/QueryInProgress.java       | 87 ++------------------
 .../apache/tajo/querymaster/QueryJobEvent.java  |  5 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  2 +-
 .../tajo/querymaster/QueryMasterTask.java       | 47 +----------
 .../src/main/proto/TajoMasterProtocol.proto     |  1 -
 .../apache/tajo/querymaster/TestKillQuery.java  |  2 +-
 12 files changed, 53 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 369dbda..35670d7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1282: Cleanup the relationship of QueryInProgress and 
+    QueryJobManager. (hyunsik)
+
     TAJO-1258: Close() for classes derived from FileAppender should be robust.
     (Jongyoung Park via jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
index c9b8711..6a8da27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
@@ -28,22 +28,25 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * QueryJobManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
 public class QueryJobManager extends CompositeService {
   private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
 
@@ -69,7 +72,7 @@ public class QueryJobManager extends CompositeService {
   }
 
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     try {
       this.dispatcher = new AsyncDispatcher();
       addService(this.dispatcher);
@@ -81,24 +84,24 @@ public class QueryJobManager extends CompositeService {
       catchException(null, e);
     }
 
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     synchronized(runningQueries) {
       for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
-        eachQueryInProgress.stop();
+        eachQueryInProgress.stopProgress();
       }
     }
     this.scheduler.stop();
-    super.stop();
+    super.serviceStop();
   }
 
   @Override
-  public void start() {
+  public void serviceStart() throws Exception {
     this.scheduler.start();
-    super.start();
+    super.serviceStart();
   }
 
   public EventHandler getEventHandler() {
@@ -164,39 +167,42 @@ public class QueryJobManager extends CompositeService {
       runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
     }
 
-    addService(queryInProgress);
-    queryInProgress.init(getConfig());
-    queryInProgress.start();
-
-    if (!queryInProgress.startQueryMaster()) {
+    if (queryInProgress.startQueryMaster()) {
+      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+          queryInProgress.getQueryInfo()));
+    } else {
       stopQuery(queryId);
     }
 
     return queryInProgress.getQueryInfo();
   }
 
-  public TajoMaster.MasterContext getMasterContext() {
-    return masterContext;
-  }
-
   class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
     @Override
     public void handle(QueryJobEvent event) {
       QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-      if(queryInProgress == null) {
+
+
+      if (queryInProgress == null) {
         LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
         return;
       }
 
-      if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+
+      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        queryInProgress.submmitQueryToMaster();
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
         stopQuery(event.getQueryInfo().getQueryId());
-      } else if (queryInProgress.isStarted()) {
-        queryInProgress.getEventHandler().handle(event);
+
       } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
         scheduler.removeQuery(queryInProgress.getQueryId());
-        queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-
+        queryInProgress.kill();
         stopQuery(queryInProgress.getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        queryInProgress.heartbeat(event.getQueryInfo());
       }
     }
   }
@@ -219,7 +225,7 @@ public class QueryJobManager extends CompositeService {
     LOG.info("Stop QueryInProgress:" + queryId);
     QueryInProgress queryInProgress = getQueryInProgress(queryId);
     if(queryInProgress != null) {
-      queryInProgress.stop();
+      queryInProgress.stopProgress();
       synchronized(submittedQueries) {
         submittedQueries.remove(queryId);
       }
@@ -245,7 +251,6 @@ public class QueryJobManager extends CompositeService {
         avgExecutionTime.set(executionTime);
       }
       executedQuerySize.incrementAndGet();
-      removeService(queryInProgress);
     } else {
       LOG.warn("No QueryInProgress while query stopping: " + queryId);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index a7df206..02bdfa1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -136,13 +136,6 @@ public class TajoMasterService extends AbstractService {
     }
 
     @Override
-    public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
-                                RpcCallback<BoolProto> done) {
-      context.getQueryJobManager().stopQuery(new QueryId(request));
-      done.run(BOOL_TRUE);
-    }
-
-    @Override
     public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
                                      RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index c4200d5..9f2a3d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -526,7 +526,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
   }
 
   @Override
-  public void stopQueryMaster(QueryId queryId) {
+  public void releaseQueryMaster(QueryId queryId) {
     if(!rmContext.getQueryMasterContainer().containsKey(queryId)) {
       LOG.warn("No QueryMaster resource info for " + queryId);
       return;

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index b237cc5..79ec0ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -80,7 +80,7 @@ public interface WorkerResourceManager extends Service {
    *
    * @param queryId QueryId to be stopped
    */
-  public void stopQueryMaster(QueryId queryId);
+  public void releaseQueryMaster(QueryId queryId);
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index bd8ca28..a091ed5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -58,7 +58,8 @@ public class SimpleFifoScheduler implements Scheduler {
       LOG.info("Size of Fifo queue is " + qSize);
     }
 
-    QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime());
+    QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1,
+        queryInProgress.getQueryInfo().getStartTime());
     boolean result = pool.add(querySchedulingInfo);
     if (getRunningQueries().size() == 0) wakeupProcessor();
     return result;

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
index bda2ec1..f83f244 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
@@ -20,11 +20,7 @@ package org.apache.tajo.querymaster;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.query.QueryContext;
@@ -35,12 +31,12 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
 import org.apache.tajo.master.QueryInfo;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
@@ -48,15 +44,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
 
-public class QueryInProgress extends CompositeService {
+public class QueryInProgress {
   private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
 
   private QueryId queryId;
 
   private Session session;
 
-  private AsyncDispatcher dispatcher;
-
   private LogicalRootNode plan;
 
   private AtomicBoolean querySubmitted = new AtomicBoolean(false);
@@ -76,7 +70,7 @@ public class QueryInProgress extends CompositeService {
       Session session,
       QueryContext queryContext,
       QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-    super(QueryInProgress.class.getName());
+
     this.masterContext = masterContext;
     this.session = session;
     this.queryId = queryId;
@@ -86,23 +80,14 @@ public class QueryInProgress extends CompositeService {
     queryInfo.setStartTime(System.currentTimeMillis());
   }
 
-  @Override
-  public void init(Configuration conf) {
-    dispatcher = new AsyncDispatcher();
-    this.addService(dispatcher);
-
-    dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
-    super.init(conf);
-  }
-
   public synchronized void kill() {
+    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
     if(queryMasterRpcClient != null){
       queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
     }
   }
 
-  @Override
-  public void stop() {
+  public void stopProgress() {
     if(stopped.getAndSet(true)) {
       return;
     }
@@ -110,52 +95,15 @@ public class QueryInProgress extends CompositeService {
     LOG.info("=========================================================");
     LOG.info("Stop query:" + queryId);
 
-    masterContext.getResourceManager().stopQueryMaster(queryId);
-
-    long startTime = System.currentTimeMillis();
-    while(true) {
-      try {
-        if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
-          LOG.info(queryId + " QueryMaster stopped");
-          break;
-        }
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        break;
-      }
-
-      try {
-        synchronized (this){
-          wait(100);
-        }
-      } catch (InterruptedException e) {
-        break;
-      }
-      if(System.currentTimeMillis() - startTime > 60 * 1000) {
-        LOG.warn("Failed to stop QueryMaster:" + queryId);
-        break;
-      }
-    }
+    masterContext.getResourceManager().releaseQueryMaster(queryId);
 
     if(queryMasterRpc != null) {
       RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
     }
 
     masterContext.getHistoryWriter().appendHistory(queryInfo);
-    super.stop();
-  }
-
-  @Override
-  public void start() {
-    super.start();
   }
 
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-
-
   public boolean startQueryMaster() {
     try {
       LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
@@ -173,8 +121,6 @@ public class QueryInProgress extends CompositeService {
       queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
       queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
 
-      getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
-
       return true;
     } catch (Exception e) {
       catchException(e);
@@ -182,23 +128,6 @@ public class QueryInProgress extends CompositeService {
     }
   }
 
-  class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
-    @Override
-    public void handle(QueryJobEvent queryJobEvent) {
-      if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
-        heartbeat(queryJobEvent.getQueryInfo());
-      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
-        QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
-        queryInProgress.getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
-      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
-        submmitQueryToMaster();
-      } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        kill();
-      }
-    }
-  }
-
   private void connectQueryMaster() throws Exception {
     InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
@@ -207,7 +136,7 @@ public class QueryInProgress extends CompositeService {
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 
-  private synchronized void submmitQueryToMaster() {
+  public synchronized void submmitQueryToMaster() {
     if(querySubmitted.get()) {
       return;
     }
@@ -256,7 +185,7 @@ public class QueryInProgress extends CompositeService {
     return !stopped.get() && this.querySubmitted.get();
   }
 
-  private void heartbeat(QueryInfo queryInfo) {
+  public void heartbeat(QueryInfo queryInfo) {
     LOG.info("Received QueryMaster heartbeat:" + queryInfo);
 
     // to avoid partial update by different heartbeats

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
index 1a1f2ff..27eb2b6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
@@ -35,12 +35,9 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
   }
 
   public enum Type {
-    QUERY_JOB_START,
+    QUERY_MASTER_START,
     QUERY_JOB_HEARTBEAT,
-    QUERY_JOB_FINISH,
     QUERY_JOB_STOP,
-    QUERY_MASTER_START,
-    QUERY_MASTER_STOP,
     QUERY_JOB_KILL
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 76df397..02760a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -473,7 +473,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     public void handle(QueryStartEvent event) {
       LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
       QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson());
+          event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr());
 
       synchronized(queryMasterTasks) {
         queryMasterTasks.put(event.getQueryId(), queryMasterTask);

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index bab5903..fd52488 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -41,13 +41,10 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -58,8 +55,7 @@ import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageProperty;
 import org.apache.tajo.storage.StorageUtil;
@@ -96,12 +92,8 @@ public class QueryMasterTask extends CompositeService {
 
   private Query query;
 
-  private MasterPlan masterPlan;
-
   private String jsonExpr;
 
-  private String logicalPlanJson;
-
   private AsyncDispatcher dispatcher;
 
   private final long querySubmitTime;
@@ -124,8 +116,7 @@ public class QueryMasterTask extends CompositeService {
       new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
-                         String logicalPlanJson) {
+                         QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) {
 
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
@@ -133,7 +124,6 @@ public class QueryMasterTask extends CompositeService {
     this.session = session;
     this.queryContext = queryContext;
     this.jsonExpr = jsonExpr;
-    this.logicalPlanJson = logicalPlanJson;
     this.querySubmitTime = System.currentTimeMillis();
   }
 
@@ -198,42 +188,11 @@ public class QueryMasterTask extends CompositeService {
       LOG.fatal(t.getMessage(), t);
     }
 
-    RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
-    NettyClientBase tmClient = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
-
-    super.stop();
-
-    //TODO change report to tajo master
     if (queryMetrics != null) {
       queryMetrics.report(new MetricsConsoleReporter());
     }
 
+    super.stop();
     LOG.info("Stopped QueryMasterTask:" + queryId);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index cc83e47..bc73596 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -143,6 +143,5 @@ service TajoMasterProtocolService {
   rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
   rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
   rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
-  rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
   rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index a125196..bd899cd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -90,7 +90,7 @@ public class TestKillQuery {
 
     QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
-        queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson());
+        queryId, session, defaultContext, expr.toJson());
 
     queryMasterTask.init(conf);
     queryMasterTask.getQueryTaskContext().getDispatcher().start();


[7/8] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

Posted by ji...@apache.org.
TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

Closes #342


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/807868bd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/807868bd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/807868bd

Branch: refs/heads/index_support
Commit: 807868bd4d1ca1c8bd3aee33d31cca3d43dd2273
Parents: 50a8a66
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 20:44:21 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 20:44:21 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 tajo-core/pom.xml                               |   2 +-
 .../tajo/master/QueryCoordinatorService.java    | 160 ++++++++++
 .../org/apache/tajo/master/QueryInProgress.java | 228 +++++++++++++
 .../org/apache/tajo/master/QueryJobManager.java | 316 -------------------
 .../org/apache/tajo/master/QueryManager.java    | 315 ++++++++++++++++++
 .../apache/tajo/master/TajoContainerProxy.java  |  12 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  16 +-
 .../tajo/master/TajoMasterClientService.java    |   9 +-
 .../apache/tajo/master/TajoMasterService.java   | 161 ----------
 .../apache/tajo/master/exec/QueryExecutor.java  |   4 +-
 .../tajo/master/rm/TajoResourceTracker.java     |  12 +-
 .../master/rm/TajoWorkerResourceManager.java    |  14 +-
 .../tajo/master/rm/WorkerResourceManager.java   |  17 +-
 .../apache/tajo/master/scheduler/Scheduler.java |   2 +-
 .../master/scheduler/SimpleFifoScheduler.java   |   8 +-
 .../tajo/querymaster/QueryInProgress.java       | 230 --------------
 .../apache/tajo/querymaster/QueryMaster.java    |  95 ++----
 .../java/org/apache/tajo/querymaster/Stage.java |  17 -
 .../main/java/org/apache/tajo/util/JSPUtil.java |   2 +-
 .../tajo/worker/TajoResourceAllocator.java      |  37 ++-
 .../java/org/apache/tajo/worker/TajoWorker.java |  20 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  27 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  14 +-
 .../main/proto/QueryCoordinatorProtocol.proto   | 147 +++++++++
 .../main/proto/ResourceTrackerProtocol.proto    |   2 +-
 .../src/main/proto/TajoMasterProtocol.proto     | 147 ---------
 .../src/main/resources/webapps/admin/index.jsp  |   2 +-
 .../src/main/resources/webapps/admin/query.jsp  |   2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   2 +-
 .../tajo/master/rm/TestTajoResourceManager.java |   3 +-
 31 files changed, 979 insertions(+), 1047 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4e38f78..89488da 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
+    (hyunsik)
+
     TAJO-1286: Remove netty dependency from tajo-jdbc. (jihun)
 
     TAJO-1282: Cleanup the relationship of QueryInProgress and 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index a7205dd..05ccf07 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -166,7 +166,7 @@
                 <argument>src/main/proto/ContainerProtocol.proto</argument>
                 <argument>src/main/proto/ResourceTrackerProtocol.proto</argument>
                 <argument>src/main/proto/QueryMasterProtocol.proto</argument>
-                <argument>src/main/proto/TajoMasterProtocol.proto</argument>
+                <argument>src/main/proto/QueryCoordinatorProtocol.proto</argument>
                 <argument>src/main/proto/TajoWorkerProtocol.proto</argument>
                 <argument>src/main/proto/InternalTypes.proto</argument>
               </arguments>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
new file mode 100644
index 0000000..1cb3842
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
+public class QueryCoordinatorService extends AbstractService {
+  private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class);
+
+  private final TajoMaster.MasterContext context;
+  private final TajoConf conf;
+  private final ProtocolServiceHandler masterHandler;
+  private AsyncRpcServer server;
+  private InetSocketAddress bindAddress;
+
+  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+  public QueryCoordinatorService(TajoMaster.MasterContext context) {
+    super(QueryCoordinatorService.class.getName());
+    this.context = context;
+    this.conf = context.getConf();
+    this.masterHandler = new ProtocolServiceHandler();
+  }
+
+  @Override
+  public void start() {
+    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+    try {
+      server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    server.start();
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+        NetUtils.normalizeInetSocketAddress(bindAddress));
+    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  /**
+   * Actual protocol service handler
+   */
+  private class ProtocolServiceHandler implements QueryCoordinatorProtocolService.Interface {
+
+    @Override
+    public void heartbeat(
+        RpcController controller,
+        TajoHeartbeat request, RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
+      }
+
+      QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+
+      QueryManager queryManager = context.getQueryJobManager();
+      command = queryManager.queryHeartbeat(request);
+
+      QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder();
+      builder.setHeartbeatResult(BOOL_TRUE);
+      if(command != null) {
+        builder.setResponseCommand(command);
+      }
+
+      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
+      done.run(builder.build());
+    }
+
+    @Override
+    public void allocateWorkerResources(
+        RpcController controller,
+        QueryCoordinatorProtocol.WorkerResourceAllocationRequest request,
+        RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> done) {
+      context.getResourceManager().allocateWorkerResources(request, done);
+    }
+
+    @Override
+    public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request,
+                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
+
+      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
+        context.getResourceManager().releaseWorkerResource(eachContainer);
+      }
+      done.run(BOOL_TRUE);
+    }
+
+    @Override
+    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+                                     RpcCallback<WorkerResourcesRequest> done) {
+
+      WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder();
+      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
+
+      for(Worker worker: workers) {
+        WorkerResource resource = worker.getResource();
+
+        WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder();
+
+        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
+        workerResource.setMemoryMB(resource.getMemoryMB());
+        workerResource.setDiskSlots(resource.getDiskSlots());
+
+        builder.addWorkerResources(workerResource);
+      }
+      done.run(builder.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
new file mode 100644
index 0000000..73d8cb2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class QueryInProgress {
+  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+  private QueryId queryId;
+
+  private Session session;
+
+  private LogicalRootNode plan;
+
+  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private QueryInfo queryInfo;
+
+  private final TajoMaster.MasterContext masterContext;
+
+  private NettyClientBase queryMasterRpc;
+
+  private QueryMasterProtocolService queryMasterRpcClient;
+
+  public QueryInProgress(
+      TajoMaster.MasterContext masterContext,
+      Session session,
+      QueryContext queryContext,
+      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
+
+    this.masterContext = masterContext;
+    this.session = session;
+    this.queryId = queryId;
+    this.plan = plan;
+
+    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
+    queryInfo.setStartTime(System.currentTimeMillis());
+  }
+
+  public synchronized void kill() {
+    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+    if(queryMasterRpcClient != null){
+      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+    }
+  }
+
+  public void stopProgress() {
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    LOG.info("=========================================================");
+    LOG.info("Stop query:" + queryId);
+
+    masterContext.getResourceManager().releaseQueryMaster(queryId);
+
+    if(queryMasterRpc != null) {
+      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+    }
+
+    masterContext.getHistoryWriter().appendHistory(queryInfo);
+  }
+
+  public boolean startQueryMaster() {
+    try {
+      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+      WorkerResourceManager resourceManager = masterContext.getResourceManager();
+      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+      // if no resource to allocate a query master
+      if(resource == null) {
+        LOG.info("No Available Resources for QueryMaster");
+        return false;
+      }
+
+      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+
+      return true;
+    } catch (Exception e) {
+      catchException(e);
+      return false;
+    }
+  }
+
+  private void connectQueryMaster() throws Exception {
+    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+    LOG.info("Connect to QueryMaster:" + addr);
+    queryMasterRpc =
+        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+    queryMasterRpcClient = queryMasterRpc.getStub();
+  }
+
+  public synchronized void submmitQueryToMaster() {
+    if(querySubmitted.get()) {
+      return;
+    }
+
+    try {
+      if(queryMasterRpcClient == null) {
+        connectQueryMaster();
+      }
+      if(queryMasterRpcClient == null) {
+        LOG.info("No QueryMaster conneciton info.");
+        //TODO wait
+        return;
+      }
+      LOG.info("Call executeQuery to :" +
+          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+
+      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+      builder.setQueryId(queryId.getProto())
+          .setQueryContext(queryInfo.getQueryContext().getProto())
+          .setSession(session.getProto())
+          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
+          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+
+      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+      querySubmitted.set(true);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public void catchException(Exception e) {
+    LOG.error(e.getMessage(), e);
+    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+    queryInfo.setLastMessage(StringUtils.stringifyException(e));
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public boolean isStarted() {
+    return !stopped.get() && this.querySubmitted.get();
+  }
+
+  public void heartbeat(QueryInfo queryInfo) {
+    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+
+    // to avoid partial update by different heartbeats
+    synchronized (this.queryInfo) {
+
+      // terminal state will let client to retrieve a query result
+      // So, we must set the query result before changing query state
+      if (isFinishState(queryInfo.getQueryState())) {
+        if (queryInfo.hasResultdesc()) {
+          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+        }
+      }
+
+      this.queryInfo.setQueryState(queryInfo.getQueryState());
+      this.queryInfo.setProgress(queryInfo.getProgress());
+      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+      // Update diagnosis message
+      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+        LOG.info(queryId + queryInfo.getLastMessage());
+      }
+
+      // if any error occurs, print outs the error message
+      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+      }
+
+
+      if (isFinishState(this.queryInfo.getQueryState())) {
+        masterContext.getQueryJobManager().getEventHandler().handle(
+            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+      }
+    }
+  }
+
+  private boolean isFinishState(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_FAILED ||
+        state == TajoProtos.QueryState.QUERY_KILLED ||
+        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
deleted file mode 100644
index 6a8da27..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.querymaster.QueryJobEvent;
-import org.apache.tajo.session.Session;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * QueryJobManager manages all scheduled and running queries.
- * It receives all Query related events and routes them to each QueryInProgress.
- */
-public class QueryJobManager extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
-
-  // TajoMaster Context
-  private final TajoMaster.MasterContext masterContext;
-
-  private AsyncDispatcher dispatcher;
-
-  private SimpleFifoScheduler scheduler;
-
-  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
-  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-
-  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
-  private AtomicLong maxExecutionTime = new AtomicLong();
-  private AtomicLong avgExecutionTime = new AtomicLong();
-  private AtomicLong executedQuerySize = new AtomicLong();
-
-  public QueryJobManager(final TajoMaster.MasterContext masterContext) {
-    super(QueryJobManager.class.getName());
-    this.masterContext = masterContext;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    try {
-      this.dispatcher = new AsyncDispatcher();
-      addService(this.dispatcher);
-
-      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
-
-      this.scheduler = new SimpleFifoScheduler(this);
-    } catch (Exception e) {
-      catchException(null, e);
-    }
-
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    synchronized(runningQueries) {
-      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
-        eachQueryInProgress.stopProgress();
-      }
-    }
-    this.scheduler.stop();
-    super.serviceStop();
-  }
-
-  @Override
-  public void serviceStart() throws Exception {
-    this.scheduler.start();
-    super.serviceStart();
-  }
-
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-  public Collection<QueryInProgress> getSubmittedQueries() {
-    synchronized (submittedQueries){
-      return Collections.unmodifiableCollection(submittedQueries.values());
-    }
-  }
-
-  public Collection<QueryInProgress> getRunningQueries() {
-    synchronized (runningQueries){
-      return Collections.unmodifiableCollection(runningQueries.values());
-    }
-  }
-
-  public synchronized Collection<QueryInfo> getFinishedQueries() {
-    try {
-      return this.masterContext.getHistoryReader().getQueries(null);
-    } catch (Throwable e) {
-      LOG.error(e);
-      return Lists.newArrayList();
-    }
-  }
-
-
-  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
-    try {
-      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
-    } catch (Throwable e) {
-      LOG.error(e);
-      return null;
-    }
-  }
-
-  public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
-                                     String jsonExpr, LogicalRootNode plan)
-      throws Exception {
-    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
-        jsonExpr, plan);
-
-    synchronized (submittedQueries) {
-      queryInProgress.getQueryInfo().setQueryMaster("");
-      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    scheduler.addQuery(queryInProgress);
-    return queryInProgress.getQueryInfo();
-  }
-
-  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
-    QueryInProgress queryInProgress;
-
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.remove(queryId);
-    }
-
-    synchronized (runningQueries) {
-      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    if (queryInProgress.startQueryMaster()) {
-      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
-          queryInProgress.getQueryInfo()));
-    } else {
-      stopQuery(queryId);
-    }
-
-    return queryInProgress.getQueryInfo();
-  }
-
-  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
-
-    @Override
-    public void handle(QueryJobEvent event) {
-      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-
-
-      if (queryInProgress == null) {
-        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
-        return;
-      }
-
-
-      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
-        queryInProgress.submmitQueryToMaster();
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
-        stopQuery(event.getQueryInfo().getQueryId());
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        scheduler.removeQuery(queryInProgress.getQueryId());
-        queryInProgress.kill();
-        stopQuery(queryInProgress.getQueryId());
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
-        queryInProgress.heartbeat(event.getQueryInfo());
-      }
-    }
-  }
-
-  public QueryInProgress getQueryInProgress(QueryId queryId) {
-    QueryInProgress queryInProgress;
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.get(queryId);
-    }
-
-    if (queryInProgress == null) {
-      synchronized (runningQueries) {
-        queryInProgress = runningQueries.get(queryId);
-      }
-    }
-    return queryInProgress;
-  }
-
-  public void stopQuery(QueryId queryId) {
-    LOG.info("Stop QueryInProgress:" + queryId);
-    QueryInProgress queryInProgress = getQueryInProgress(queryId);
-    if(queryInProgress != null) {
-      queryInProgress.stopProgress();
-      synchronized(submittedQueries) {
-        submittedQueries.remove(queryId);
-      }
-
-      synchronized(runningQueries) {
-        runningQueries.remove(queryId);
-      }
-
-      QueryInfo queryInfo = queryInProgress.getQueryInfo();
-      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
-      if (executionTime < minExecutionTime.get()) {
-        minExecutionTime.set(executionTime);
-      }
-
-      if (executionTime > maxExecutionTime.get()) {
-        maxExecutionTime.set(executionTime);
-      }
-
-      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
-      if (totalExecutionTime > 0) {
-        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
-      } else {
-        avgExecutionTime.set(executionTime);
-      }
-      executedQuerySize.incrementAndGet();
-    } else {
-      LOG.warn("No QueryInProgress while query stopping: " + queryId);
-    }
-  }
-
-  public long getMinExecutionTime() {
-    if (getExecutedQuerySize() == 0) return 0;
-    return minExecutionTime.get();
-  }
-
-  public long getMaxExecutionTime() {
-    return maxExecutionTime.get();
-  }
-
-  public long getAvgExecutionTime() {
-    return avgExecutionTime.get();
-  }
-
-  public long getExecutedQuerySize() {
-    return executedQuerySize.get();
-  }
-
-  private void catchException(QueryId queryId, Exception e) {
-    LOG.error(e.getMessage(), e);
-    QueryInProgress queryInProgress = runningQueries.get(queryId);
-    queryInProgress.catchException(e);
-  }
-
-  public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
-      TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
-    if(queryInProgress == null) {
-      return null;
-    }
-
-    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
-    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
-
-    return null;
-  }
-
-  private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
-    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
-
-    queryInfo.setQueryMaster(connectionInfo.getHost());
-    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
-    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
-    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
-    queryInfo.setQueryState(queryHeartbeat.getState());
-    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
-
-    if (queryHeartbeat.hasQueryFinishTime()) {
-      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
-    }
-
-    if (queryHeartbeat.hasResultDesc()) {
-      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
-    }
-
-    return queryInfo;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
new file mode 100644
index 0000000..296be04
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.session.Session;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * QueryManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
+public class QueryManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryManager.class.getName());
+
+  // TajoMaster Context
+  private final TajoMaster.MasterContext masterContext;
+
+  private AsyncDispatcher dispatcher;
+
+  private SimpleFifoScheduler scheduler;
+
+  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
+
+  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
+
+  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
+  private AtomicLong maxExecutionTime = new AtomicLong();
+  private AtomicLong avgExecutionTime = new AtomicLong();
+  private AtomicLong executedQuerySize = new AtomicLong();
+
+  public QueryManager(final TajoMaster.MasterContext masterContext) {
+    super(QueryManager.class.getName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    try {
+      this.dispatcher = new AsyncDispatcher();
+      addService(this.dispatcher);
+
+      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+      this.scheduler = new SimpleFifoScheduler(this);
+    } catch (Exception e) {
+      catchException(null, e);
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized(runningQueries) {
+      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+        eachQueryInProgress.stopProgress();
+      }
+    }
+    this.scheduler.stop();
+    super.serviceStop();
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    this.scheduler.start();
+    super.serviceStart();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public Collection<QueryInProgress> getSubmittedQueries() {
+    synchronized (submittedQueries){
+      return Collections.unmodifiableCollection(submittedQueries.values());
+    }
+  }
+
+  public Collection<QueryInProgress> getRunningQueries() {
+    synchronized (runningQueries){
+      return Collections.unmodifiableCollection(runningQueries.values());
+    }
+  }
+
+  public synchronized Collection<QueryInfo> getFinishedQueries() {
+    try {
+      return this.masterContext.getHistoryReader().getQueries(null);
+    } catch (Throwable e) {
+      LOG.error(e);
+      return Lists.newArrayList();
+    }
+  }
+
+
+  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+    try {
+      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+    } catch (Throwable e) {
+      LOG.error(e);
+      return null;
+    }
+  }
+
+  public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql,
+                                 String jsonExpr, LogicalRootNode plan)
+      throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
+        jsonExpr, plan);
+
+    synchronized (submittedQueries) {
+      queryInProgress.getQueryInfo().setQueryMaster("");
+      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    scheduler.addQuery(queryInProgress);
+    return queryInProgress.getQueryInfo();
+  }
+
+  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+    QueryInProgress queryInProgress;
+
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.remove(queryId);
+    }
+
+    synchronized (runningQueries) {
+      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    if (queryInProgress.startQueryMaster()) {
+      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+          queryInProgress.getQueryInfo()));
+    } else {
+      stopQuery(queryId);
+    }
+
+    return queryInProgress.getQueryInfo();
+  }
+
+  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
+    @Override
+    public void handle(QueryJobEvent event) {
+      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+
+
+      if (queryInProgress == null) {
+        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+        return;
+      }
+
+
+      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        queryInProgress.submmitQueryToMaster();
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+        stopQuery(event.getQueryInfo().getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        scheduler.removeQuery(queryInProgress.getQueryId());
+        queryInProgress.kill();
+        stopQuery(queryInProgress.getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        queryInProgress.heartbeat(event.getQueryInfo());
+      }
+    }
+  }
+
+  public QueryInProgress getQueryInProgress(QueryId queryId) {
+    QueryInProgress queryInProgress;
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.get(queryId);
+    }
+
+    if (queryInProgress == null) {
+      synchronized (runningQueries) {
+        queryInProgress = runningQueries.get(queryId);
+      }
+    }
+    return queryInProgress;
+  }
+
+  public void stopQuery(QueryId queryId) {
+    LOG.info("Stop QueryInProgress:" + queryId);
+    QueryInProgress queryInProgress = getQueryInProgress(queryId);
+    if(queryInProgress != null) {
+      queryInProgress.stopProgress();
+      synchronized(submittedQueries) {
+        submittedQueries.remove(queryId);
+      }
+
+      synchronized(runningQueries) {
+        runningQueries.remove(queryId);
+      }
+
+      QueryInfo queryInfo = queryInProgress.getQueryInfo();
+      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
+      if (executionTime < minExecutionTime.get()) {
+        minExecutionTime.set(executionTime);
+      }
+
+      if (executionTime > maxExecutionTime.get()) {
+        maxExecutionTime.set(executionTime);
+      }
+
+      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
+      if (totalExecutionTime > 0) {
+        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
+      } else {
+        avgExecutionTime.set(executionTime);
+      }
+      executedQuerySize.incrementAndGet();
+    } else {
+      LOG.warn("No QueryInProgress while query stopping: " + queryId);
+    }
+  }
+
+  public long getMinExecutionTime() {
+    if (getExecutedQuerySize() == 0) return 0;
+    return minExecutionTime.get();
+  }
+
+  public long getMaxExecutionTime() {
+    return maxExecutionTime.get();
+  }
+
+  public long getAvgExecutionTime() {
+    return avgExecutionTime.get();
+  }
+
+  public long getExecutedQuerySize() {
+    return executedQuerySize.get();
+  }
+
+  private void catchException(QueryId queryId, Exception e) {
+    LOG.error(e.getMessage(), e);
+    QueryInProgress queryInProgress = runningQueries.get(queryId);
+    queryInProgress.catchException(e);
+  }
+
+  public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+      QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryInProgress == null) {
+      return null;
+    }
+
+    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+    return null;
+  }
+
+  private QueryInfo makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+    queryInfo.setQueryMaster(connectionInfo.getHost());
+    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
+    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+    queryInfo.setQueryState(queryHeartbeat.getState());
+    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+    if (queryHeartbeat.hasQueryFinishTime()) {
+      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+    }
+
+    if (queryHeartbeat.hasResultDesc()) {
+      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+    }
+
+    return queryInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7209080..2ffd7ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -26,7 +26,7 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.master.container.TajoContainer;
@@ -177,23 +177,23 @@ public class TajoContainerProxy extends ContainerProxy {
       if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         try {
           tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         } catch (Exception e) {
           context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
               HAServiceUtil.getResourceTrackerAddress(conf));
           context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
               HAServiceUtil.getMasterUmbilicalAddress(conf));
           tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
       } else {
         tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
       }
 
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+      QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,
-          TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+            QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
               .setExecutionBlockId(executionBlockId.getProto())
               .addAllContainerIds(containerIdProtos)
               .build(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index c054599..786025a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -114,14 +114,14 @@ public class TajoMaster extends CompositeService {
   private GlobalEngine globalEngine;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
-  private TajoMasterService tajoMasterService;
+  private QueryCoordinatorService tajoMasterService;
   private SessionManager sessionManager;
 
   private WorkerResourceManager resourceManager;
   //Web Server
   private StaticHttpServer webServer;
 
-  private QueryJobManager queryJobManager;
+  private QueryManager queryManager;
 
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
@@ -183,13 +183,13 @@ public class TajoMaster extends CompositeService {
       globalEngine = new GlobalEngine(context);
       addIfService(globalEngine);
 
-      queryJobManager = new QueryJobManager(context);
-      addIfService(queryJobManager);
+      queryManager = new QueryManager(context);
+      addIfService(queryManager);
 
       tajoMasterClientService = new TajoMasterClientService(context);
       addIfService(tajoMasterClientService);
 
-      tajoMasterService = new TajoMasterService(context);
+      tajoMasterService = new QueryCoordinatorService(context);
       addIfService(tajoMasterService);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -441,8 +441,8 @@ public class TajoMaster extends CompositeService {
       return clock;
     }
 
-    public QueryJobManager getQueryJobManager() {
-      return queryJobManager;
+    public QueryManager getQueryJobManager() {
+      return queryManager;
     }
 
     public WorkerResourceManager getResourceManager() {
@@ -469,7 +469,7 @@ public class TajoMaster extends CompositeService {
       return storeManager;
     }
 
-    public TajoMasterService getTajoMasterService() {
+    public QueryCoordinatorService getTajoMasterService() {
       return tajoMasterService;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 93326be..fcc016a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -45,7 +45,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
-import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
@@ -567,8 +566,8 @@ public class TajoMasterClientService extends AbstractService {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
 
-        QueryJobManager queryJobManager = context.getQueryJobManager();
-        QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId);
+        QueryManager queryManager = context.getQueryJobManager();
+        QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryId);
 
         QueryInfo queryInfo = null;
         if (queryInProgress == null) {
@@ -598,8 +597,8 @@ public class TajoMasterClientService extends AbstractService {
       try {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
-        QueryJobManager queryJobManager = context.getQueryJobManager();
-        queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+        QueryManager queryManager = context.getQueryJobManager();
+        queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
             new QueryInfo(queryId)));
         return BOOL_TRUE;
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
deleted file mode 100644
index 02bdfa1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.AsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.List;
-
-public class TajoMasterService extends AbstractService {
-  private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
-
-  private final TajoMaster.MasterContext context;
-  private final TajoConf conf;
-  private final TajoMasterServiceHandler masterHandler;
-  private AsyncRpcServer server;
-  private InetSocketAddress bindAddress;
-
-  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
-  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
-
-  public TajoMasterService(TajoMaster.MasterContext context) {
-    super(TajoMasterService.class.getName());
-    this.context = context;
-    this.conf = context.getConf();
-    this.masterHandler = new TajoMasterServiceHandler();
-  }
-
-  @Override
-  public void start() {
-    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
-    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
-    try {
-      server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    server.start();
-    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
-    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
-        NetUtils.normalizeInetSocketAddress(bindAddress));
-    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(server != null) {
-      server.shutdown();
-      server = null;
-    }
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return bindAddress;
-  }
-
-  public class TajoMasterServiceHandler
-      implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
-    @Override
-    public void heartbeat(
-        RpcController controller,
-        TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
-      }
-
-      TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
-
-      QueryJobManager queryJobManager = context.getQueryJobManager();
-      command = queryJobManager.queryHeartbeat(request);
-
-      TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
-      builder.setHeartbeatResult(BOOL_TRUE);
-      if(command != null) {
-        builder.setResponseCommand(command);
-      }
-
-      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
-      done.run(builder.build());
-    }
-
-    @Override
-    public void allocateWorkerResources(
-        RpcController controller,
-        TajoMasterProtocol.WorkerResourceAllocationRequest request,
-        RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
-      context.getResourceManager().allocateWorkerResources(request, done);
-    }
-
-    @Override
-    public void releaseWorkerResource(RpcController controller,
-                                           TajoMasterProtocol.WorkerResourceReleaseRequest request,
-                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
-
-      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
-        context.getResourceManager().releaseWorkerResource(eachContainer);
-      }
-      done.run(BOOL_TRUE);
-    }
-
-    @Override
-    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
-                                     RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
-
-      TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
-          TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
-      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
-
-      for(Worker worker: workers) {
-        WorkerResource resource = worker.getResource();
-
-        TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
-            TajoMasterProtocol.WorkerResourceProto.newBuilder();
-
-        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
-        workerResource.setMemoryMB(resource.getMemoryMB());
-        workerResource.setDiskSlots(resource.getDiskSlots());
-
-        builder.addWorkerResources(workerResource);
-      }
-      done.run(builder.build());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2fbebc1..0860d63 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -388,10 +388,10 @@ public class QueryExecutor {
     context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
     hookManager.doHooks(queryContext, plan);
 
-    QueryJobManager queryJobManager = this.context.getQueryJobManager();
+    QueryManager queryManager = this.context.getQueryJobManager();
     QueryInfo queryInfo;
 
-    queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
+    queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode);
 
     if(queryInfo == null) {
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 831ce43..519aa9d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -26,7 +26,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.AsyncRpcServer;
@@ -36,8 +37,6 @@ import org.apache.tajo.util.ProtoUtil;
 import java.io.IOError;
 import java.net.InetSocketAddress;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
 import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
 import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
 
@@ -110,7 +109,8 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   }
 
   /** The response builder */
-  private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
+  private static final TajoHeartbeatResponse.Builder builder =
+      TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
 
   private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
     return new WorkerStatusEvent(
@@ -204,7 +204,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
   }
 
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+  public ClusterResourceSummary getClusterResourceSummary() {
     int totalDiskSlots = 0;
     int totalCpuCoreSlots = 0;
     int totalMemoryMB = 0;
@@ -230,7 +230,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
       }
     }
 
-    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+    return ClusterResourceSummary.newBuilder()
         .setNumWorkers(rmContext.getWorkers().size())
         .setTotalCpuCoreSlots(totalCpuCoreSlots)
         .setTotalDiskSlots(totalDiskSlots)

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 9f2a3d5..e5cf66c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -34,9 +34,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.ApplicationIdUtils;
 
@@ -49,8 +49,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.*;
-
 
 /**
  * It manages all resources of tajo workers.
@@ -162,7 +160,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
   }
 
   @Override
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+  public ClusterResourceSummary getClusterResourceSummary() {
     return resourceTracker.getClusterResourceSummary();
   }
 
@@ -204,7 +202,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
     builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
     builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
     builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
-    builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY);
+    builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY);
     builder.setNumContainers(1);
     return builder.build();
   }
@@ -358,10 +356,10 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
     int allocatedResources = 0;
 
-    TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+    ResourceRequestPriority resourceRequestPriority
       = resourceRequest.request.getResourceRequestPriority();
 
-    if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+    if(resourceRequestPriority == ResourceRequestPriority.MEMORY) {
       synchronized(rmContext) {
         List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
         Collections.shuffle(randomWorkers);

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 79ec0ac..662b699 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -22,16 +22,15 @@ import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.service.Service;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest;
+import org.apache.tajo.master.QueryInProgress;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
-
 /**
  * An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers
  * and release the allocated containers.
@@ -45,7 +44,7 @@ public interface WorkerResourceManager extends Service {
    * @return A allocated container resource
    */
   @Deprecated
-  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
+  public QueryCoordinatorProtocol.WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
 
   /**
    * Request one or more resource containers. You can set the number of containers and resource capabilities, such as
@@ -55,8 +54,8 @@ public interface WorkerResourceManager extends Service {
    * @param request Request description
    * @param rpcCallBack Callback function
    */
-  public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request,
-      RpcCallback<WorkerResourceAllocationResponse> rpcCallBack);
+  public void allocateWorkerResources(WorkerResourceAllocationRequest request,
+      RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> rpcCallBack);
 
   /**
    * Release a container
@@ -100,7 +99,7 @@ public interface WorkerResourceManager extends Service {
    *
    * @return The overall summary of cluster resources
    */
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
+  public ClusterResourceSummary getClusterResourceSummary();
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
index 02203a9..19493d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.scheduler;
 
 import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index a091ed5..6cb98eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -21,8 +21,8 @@ package org.apache.tajo.master.scheduler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.master.QueryJobManager;
+import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.QueryManager;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,10 +32,10 @@ public class SimpleFifoScheduler implements Scheduler {
   private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
   private final Thread queryProcessor;
   private AtomicBoolean stopped = new AtomicBoolean();
-  private QueryJobManager manager;
+  private QueryManager manager;
   private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
 
-  public SimpleFifoScheduler(QueryJobManager manager) {
+  public SimpleFifoScheduler(QueryManager manager) {
     this.manager = manager;
     this.queryProcessor = new Thread(new QueryProcessor());
     this.queryProcessor.setName("Query Processor");

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
deleted file mode 100644
index f83f244..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.QueryInfo;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.session.Session;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-
-public class QueryInProgress {
-  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
-
-  private QueryId queryId;
-
-  private Session session;
-
-  private LogicalRootNode plan;
-
-  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private QueryInfo queryInfo;
-
-  private final TajoMaster.MasterContext masterContext;
-
-  private NettyClientBase queryMasterRpc;
-
-  private QueryMasterProtocolService queryMasterRpcClient;
-
-  public QueryInProgress(
-      TajoMaster.MasterContext masterContext,
-      Session session,
-      QueryContext queryContext,
-      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-
-    this.masterContext = masterContext;
-    this.session = session;
-    this.queryId = queryId;
-    this.plan = plan;
-
-    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
-    queryInfo.setStartTime(System.currentTimeMillis());
-  }
-
-  public synchronized void kill() {
-    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-    if(queryMasterRpcClient != null){
-      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
-    }
-  }
-
-  public void stopProgress() {
-    if(stopped.getAndSet(true)) {
-      return;
-    }
-
-    LOG.info("=========================================================");
-    LOG.info("Stop query:" + queryId);
-
-    masterContext.getResourceManager().releaseQueryMaster(queryId);
-
-    if(queryMasterRpc != null) {
-      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
-    }
-
-    masterContext.getHistoryWriter().appendHistory(queryInfo);
-  }
-
-  public boolean startQueryMaster() {
-    try {
-      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
-      WorkerResourceManager resourceManager = masterContext.getResourceManager();
-      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
-
-      // if no resource to allocate a query master
-      if(resource == null) {
-        LOG.info("No Available Resources for QueryMaster");
-        return false;
-      }
-
-      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
-      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
-      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
-      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
-
-      return true;
-    } catch (Exception e) {
-      catchException(e);
-      return false;
-    }
-  }
-
-  private void connectQueryMaster() throws Exception {
-    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
-    LOG.info("Connect to QueryMaster:" + addr);
-    queryMasterRpc =
-        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
-    queryMasterRpcClient = queryMasterRpc.getStub();
-  }
-
-  public synchronized void submmitQueryToMaster() {
-    if(querySubmitted.get()) {
-      return;
-    }
-
-    try {
-      if(queryMasterRpcClient == null) {
-        connectQueryMaster();
-      }
-      if(queryMasterRpcClient == null) {
-        LOG.info("No QueryMaster conneciton info.");
-        //TODO wait
-        return;
-      }
-      LOG.info("Call executeQuery to :" +
-          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
-
-      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
-      builder.setQueryId(queryId.getProto())
-          .setQueryContext(queryInfo.getQueryContext().getProto())
-          .setSession(session.getProto())
-          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
-          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
-
-      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
-      querySubmitted.set(true);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  public void catchException(Exception e) {
-    LOG.error(e.getMessage(), e);
-    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
-    queryInfo.setLastMessage(StringUtils.stringifyException(e));
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public QueryInfo getQueryInfo() {
-    return this.queryInfo;
-  }
-
-  public boolean isStarted() {
-    return !stopped.get() && this.querySubmitted.get();
-  }
-
-  public void heartbeat(QueryInfo queryInfo) {
-    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-
-    // to avoid partial update by different heartbeats
-    synchronized (this.queryInfo) {
-
-      // terminal state will let client to retrieve a query result
-      // So, we must set the query result before changing query state
-      if (isFinishState(queryInfo.getQueryState())) {
-        if (queryInfo.hasResultdesc()) {
-          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
-        }
-      }
-
-      this.queryInfo.setQueryState(queryInfo.getQueryState());
-      this.queryInfo.setProgress(queryInfo.getProgress());
-      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
-
-      // Update diagnosis message
-      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
-        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
-        LOG.info(queryId + queryInfo.getLastMessage());
-      }
-
-      // if any error occurs, print outs the error message
-      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
-        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
-      }
-
-
-      if (isFinishState(this.queryInfo.getQueryState())) {
-        masterContext.getQueryJobManager().getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
-      }
-    }
-  }
-
-  private boolean isFinishState(TajoProtos.QueryState state) {
-    return state == TajoProtos.QueryState.QUERY_FAILED ||
-        state == TajoProtos.QueryState.QUERY_KILLED ||
-        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 02760a3..53390a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -34,7 +34,8 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
@@ -56,10 +57,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-
-// TODO - when exception, send error status to QueryJobManager
 public class QueryMaster extends CompositeService implements EventHandler {
   private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
 
@@ -182,12 +179,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
     LOG.info("cleanup executionBlocks: " + cleanupMessage);
     NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    List<WorkerResourceProto> workers = getAllWorker();
     TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
     builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
     TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
 
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+    for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
         rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -206,9 +203,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
   private void cleanup(QueryId queryId) {
     LOG.info("cleanup query resources : " + queryId);
     NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    List<WorkerResourceProto> workers = getAllWorker();
 
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+    for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
         rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -224,7 +221,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
   }
 
-  public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+  public List<WorkerResourceProto> getAllWorker() {
 
     NettyClientBase rpc = null;
     try {
@@ -235,78 +232,34 @@ public class QueryMaster extends CompositeService implements EventHandler {
       if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         try {
           rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         } catch (Exception e) {
           queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
               HAServiceUtil.getResourceTrackerAddress(systemConf));
           queryMasterContext.getWorkerContext().setTajoMasterAddress(
               HAServiceUtil.getMasterUmbilicalAddress(systemConf));
           rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
       } else {
         rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
       }
 
-      TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+      QueryCoordinatorProtocolService masterService = rpc.getStub();
 
-      CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
-          new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+      CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
       masterService.getAllWorkerResource(callBack.getController(),
           PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
 
-      TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+      WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
       return workerResourcesRequest.getWorkerResourcesList();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(rpc);
     }
-    return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
-  }
-
-  public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
-    LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
-    NettyClientBase tmClient = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
-      TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
-          .setConnectionInfo(workerContext.getConnectionInfo().getProto())
-          .setState(state)
-          .setQueryId(queryId.getProto());
-
-      CallFuture<TajoHeartbeatResponse> callBack =
-          new CallFuture<TajoHeartbeatResponse>();
-
-      masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
+    return new ArrayList<WorkerResourceProto>();
   }
 
   @Override
@@ -407,19 +360,19 @@ public class QueryMaster extends CompositeService implements EventHandler {
         if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
           try {
             tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+                QueryCoordinatorProtocol.class, true);
           } catch (Exception e) {
             queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
             queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
             tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+                QueryCoordinatorProtocol.class, true);
           }
         } else {
           tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
 
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
       }  catch (Exception e) {
         //this function will be closed in new thread.
@@ -524,24 +477,24 @@ public class QueryMaster extends CompositeService implements EventHandler {
               if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
                 try {
                   tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
+                      QueryCoordinatorProtocol.class, true);
                 } catch (Exception e) {
                   queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
                       HAServiceUtil.getResourceTrackerAddress(systemConf));
                   queryMasterContext.getWorkerContext().setTajoMasterAddress(
                       HAServiceUtil.getMasterUmbilicalAddress(systemConf));
                   tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
+                      QueryCoordinatorProtocol.class, true);
                 }
               } else {
                 tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                    TajoMasterProtocol.class, true);
+                    QueryCoordinatorProtocol.class, true);
               }
 
-              TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+              QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
 
-              CallFuture<TajoHeartbeatResponse> callBack =
-                  new CallFuture<TajoHeartbeatResponse>();
+              CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack =
+                  new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>();
 
               TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
               masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);


[3/8] tajo git commit: TAJO-1287: Repeated using of the same order by key in multiple window clauses should be supported. (Keuntae Park)

Posted by ji...@apache.org.
TAJO-1287: Repeated using of the same order by key in multiple window clauses should be supported. (Keuntae Park)

Closes #337


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/03801a31
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/03801a31
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/03801a31

Branch: refs/heads/index_support
Commit: 03801a3129301745e16f0a7b40102a56e9d13389
Parents: 307c6c9
Author: Keuntae Park <si...@apache.org>
Authored: Fri Jan 9 15:22:11 2015 +0900
Committer: Keuntae Park <si...@apache.org>
Committed: Fri Jan 9 15:22:11 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../engine/planner/physical/WindowAggExec.java  | 11 +++++++-
 .../tajo/engine/query/TestWindowQuery.java      | 29 ++++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/03801a31/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 35670d7..66bb299 100644
--- a/CHANGES
+++ b/CHANGES
@@ -154,6 +154,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1287: Repeated using of the same order by key in multiple 
+    window clauses should be supported. (Keuntae Park)
+
     TAJO-1265: min(), max() does not handle null properly. (Keuntae Park)
 
     TAJO-1270: Fix typos. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/03801a31/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index e36dcd8..a36bd4f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -143,7 +143,16 @@ public class WindowAggExec extends UnaryPhysicalExec {
 
           for (SortSpec sortSpec : functions[i].getSortSpecs()) {
             if (!rewrittenSchema.contains(sortSpec.getSortKey())) {
-              additionalSortKeyColumns.add(sortSpec.getSortKey());
+              // check if additionalSortKeyColumns already has that sort key
+              boolean newKey = true;
+              for (Column c : additionalSortKeyColumns) {
+                if (c.equals(sortSpec.getSortKey())) {
+                  newKey = false;
+                }
+              }
+              if (newKey) {
+                additionalSortKeyColumns.add(sortSpec.getSortKey());
+              }
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/03801a31/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
index 668ba70..14ab58f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
@@ -321,4 +321,33 @@ public class TestWindowQuery extends QueryTestCaseBase {
       executeString("DROP TABLE lastvaluetime PURGE");
     }
   }
+
+  @Test
+  public final void testMultipleWindow() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("time", TajoDataTypes.Type.TIME);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+    String[] data = new String[]{ "1|12:11:12|abc", "2|10:11:13|def", "2|05:42:41|ghi" };
+    TajoTestingCluster.createTable("multiwindow", schema, tableOptions, data, 1);
+
+    try {
+      ResultSet res = executeString(
+          "select id, last_value(time) over ( partition by id order by time ) as time_last, last_value(name) over ( partition by id order by time ) as name_last from multiwindow");
+      String ascExpected = "id,time_last,name_last\n" +
+          "-------------------------------\n" +
+          "1,12:11:12,abc\n" +
+          "2,10:11:13,def\n" +
+          "2,10:11:13,def\n";
+
+      assertEquals(ascExpected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE multiwindow PURGE");
+    }
+  }
 }
\ No newline at end of file


[6/8] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1ea7051..13394f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
@@ -838,22 +837,6 @@ public class Stage implements EventHandler<StageEvent> {
     }
 
     /**
-     * Getting the total memory of cluster
-     *
-     * @param stage
-     * @return mega bytes
-     */
-    private static int getClusterTotalMemory(Stage stage) {
-      List<TajoMasterProtocol.WorkerResourceProto> workers =
-          stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
-      int totalMem = 0;
-      for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
-        totalMem += worker.getMemoryMB();
-      }
-      return totalMem;
-    }
-    /**
      * Getting the desire number of partitions according to the volume of input data.
      * This method is only used to determine the partition key number of hash join or aggregation.
      *

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index d711258..82fb37f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.ha.HAService;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Stage;

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8241478..04b65d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -38,16 +40,18 @@ import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.StageContainerAllocationEvent;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;
-import org.apache.tajo.master.rm.*;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.ha.HAServiceUtil;
 
 import java.net.InetSocketAddress;
 import java.util.*;
@@ -91,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
                                            int memoryMBPerTask) {
     //TODO consider disk slot
 
-    TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+    ClusterResourceSummary clusterResource = workerContext.getClusterResource();
     int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
     clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
     LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
@@ -249,20 +253,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     @Override
     public void run() {
       LOG.info("Start TajoWorkerAllocationThread");
-      CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
-        new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+      CallFuture<WorkerResourceAllocationResponse> callBack =
+        new CallFuture<WorkerResourceAllocationResponse>();
 
       //TODO consider task's resource usage pattern
       int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
       float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
 
-      TajoMasterProtocol.WorkerResourceAllocationRequest request =
-        TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
           .setMinMemoryMBPerContainer(requiredMemoryMB)
           .setMaxMemoryMBPerContainer(requiredMemoryMB)
           .setNumContainers(event.getRequiredNum())
-          .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
-            : TajoMasterProtocol.ResourceRequestPriority.DISK)
+          .setResourceRequestPriority(!event.isLeafQuery() ?
+              ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK)
           .setMinDiskSlotPerContainer(requiredDiskSlots)
           .setMaxDiskSlotPerContainer(requiredDiskSlots)
           .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
@@ -280,7 +283,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           try {
             tmClient = connPool.getConnection(
               queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
           } catch (Exception e) {
             queryTaskContext.getQueryMasterContext().getWorkerContext().
               setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
@@ -288,15 +291,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
             tmClient = connPool.getConnection(
               queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
           }
         } else {
           tmClient = connPool.getConnection(
             queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
         }
 
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.allocateWorkerResources(null, request, callBack);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
@@ -304,7 +307,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         connPool.releaseConnection(tmClient);
       }
 
-      TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+      WorkerResourceAllocationResponse response = null;
       while(!stopped.get()) {
         try {
           response = callBack.get(3, TimeUnit.SECONDS);
@@ -321,11 +324,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       int numAllocatedContainers = 0;
 
       if(response != null) {
-        List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+        List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
         ExecutionBlockId executionBlockId = event.getExecutionBlockId();
 
         List<TajoContainer> containers = new ArrayList<TajoContainer>();
-        for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+        for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
           TajoWorkerContainer container = new TajoWorkerContainer();
           NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
             eachAllocatedResource.getConnectionInfo().getPeerRpcPort());

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 09a87e0..4003014 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
 
 import com.codahale.metrics.Gauge;
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.ha.TajoMasterInfo;
-import org.apache.tajo.querymaster.QueryMaster;
-import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rule.EvaluationContext;
@@ -50,7 +49,10 @@ import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.JvmPauseMonitor;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
@@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService {
 
   private AtomicInteger numClusterNodes = new AtomicInteger();
 
-  private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+  private ClusterResourceSummary clusterResource;
 
   private WorkerConnectionInfo connectionInfo;
 
@@ -516,13 +518,13 @@ public class TajoWorker extends CompositeService {
       return TajoWorker.this.numClusterNodes.get();
     }
 
-    public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+    public void setClusterResource(ClusterResourceSummary clusterResource) {
       synchronized (numClusterNodes) {
         TajoWorker.this.clusterResource = clusterResource;
       }
     }
 
-    public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+    public ClusterResourceSummary getClusterResource() {
       synchronized (numClusterNodes) {
         return TajoWorker.this.clusterResource;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index c809921..b92c4cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
@@ -35,7 +38,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.DiskDeviceInfo;
 import org.apache.tajo.storage.DiskMountInfo;
 import org.apache.tajo.storage.DiskUtil;
-import org.apache.tajo.ha.HAServiceUtil;
 
 import java.io.File;
 import java.util.List;
@@ -98,8 +100,8 @@ public class WorkerHeartbeatService extends AbstractService {
 
   class WorkerHeartbeatThread extends Thread {
     private volatile AtomicBoolean stopped = new AtomicBoolean(false);
-    TajoMasterProtocol.ServerStatusProto.System systemInfo;
-    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
+    ServerStatusProto.System systemInfo;
+    List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
     float workerDiskSlots;
     int workerMemoryMB;
     List<DiskDeviceInfo> diskDeviceInfos;
@@ -137,7 +139,7 @@ public class WorkerHeartbeatService extends AbstractService {
         }
       }
 
-      systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+      systemInfo = ServerStatusProto.System.newBuilder()
           .setAvailableProcessors(workerCpuCoreNum)
           .setFreeMemoryMB(0)
           .setMaxMemoryMB(0)
@@ -153,14 +155,14 @@ public class WorkerHeartbeatService extends AbstractService {
         if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
           getDiskUsageInfos();
         }
-        TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
-            TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+        ServerStatusProto.JvmHeap jvmHeap =
+            ServerStatusProto.JvmHeap.newBuilder()
                 .setMaxHeap(Runtime.getRuntime().maxMemory())
                 .setFreeHeap(Runtime.getRuntime().freeMemory())
                 .setTotalHeap(Runtime.getRuntime().totalMemory())
                 .build();
 
-        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+        ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
             .addAllDisk(diskInfos)
             .setRunningTaskNum(
                 context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
@@ -179,8 +181,7 @@ public class WorkerHeartbeatService extends AbstractService {
 
         NettyClientBase rmClient = null;
         try {
-          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
-              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+          CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
 
           // In TajoMaster HA mode, if backup master be active status,
           // worker may fail to connect existing active master. Thus,
@@ -201,9 +202,9 @@ public class WorkerHeartbeatService extends AbstractService {
           TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
           resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
 
-          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+          TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
           if(response != null) {
-            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+            ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
             if(clusterResourceSummary.getNumWorkers() > 0) {
               context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
             }
@@ -249,7 +250,7 @@ public class WorkerHeartbeatService extends AbstractService {
         if(mountInfos != null) {
           for(DiskMountInfo eachMount: mountInfos) {
             File eachFile = new File(eachMount.getMountPath());
-            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+            diskInfos.add(ServerStatusProto.Disk.newBuilder()
                 .setAbsolutePath(eachFile.getAbsolutePath())
                 .setTotalSpace(eachFile.getTotalSpace())
                 .setFreeSpace(eachFile.getFreeSpace())

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 6eb710a..68890e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -18,23 +18,19 @@
 
 package org.apache.tajo.worker.rule;
 
-import java.net.InetSocketAddress;
-
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rule.EvaluationContext;
-import org.apache.tajo.rule.EvaluationResult;
-import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
-import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
+import org.apache.tajo.rule.*;
 import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
-import org.apache.tajo.rule.SelfDiagnosisRule;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
+import java.net.InetSocketAddress;
+
 /**
  * With this rule, Tajo worker will check the connectivity to tajo master server.
  */
@@ -54,7 +50,7 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
       } else {
         masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
       }
-      masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true);
+      masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
       
       masterClient.getStub();
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
new file mode 100644
index 0000000..41a382f
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryCoordinatorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
+
+message ServerStatusProto {
+    message System {
+        required int32 availableProcessors = 1;
+        required int32 freeMemoryMB = 2;
+        required int32 maxMemoryMB = 3;
+        required int32 totalMemoryMB = 4;
+    }
+    message Disk {
+        required string absolutePath = 1;
+        required int64 totalSpace = 2;
+        required int64 freeSpace = 3;
+        required int64 usableSpace = 4;
+    }
+
+    message JvmHeap {
+        required int64 maxHeap = 1;
+        required int64 totalHeap = 2;
+        required int64 freeHeap = 3;
+    }
+
+    required System system = 1;
+    required float diskSlots = 2;
+    required int32 memoryResourceMB = 3;
+    repeated Disk disk = 4;
+    required int32 runningTaskNum = 5;
+    required JvmHeap jvmHeap = 6;
+    required BoolProto queryMasterMode = 7;
+    required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+  required WorkerConnectionInfoProto connectionInfo = 1;
+  optional QueryIdProto queryId = 2;
+  optional QueryState state = 3;
+  optional TableDescProto resultDesc = 4;
+  optional string statusMessage = 5;
+  optional float queryProgress = 6;
+  optional int64 queryFinishTime = 7;
+}
+
+message TajoHeartbeatResponse {
+  message ResponseCommand {
+      required string command = 1;
+      repeated string params = 2;
+  }
+  required BoolProto heartbeatResult = 1;
+  required ClusterResourceSummary clusterResourceSummary = 2;
+  optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+  required int32 numWorkers = 1;
+  required int32 totalDiskSlots = 2;
+  required int32 totalCpuCoreSlots = 3;
+  required int32 totalMemoryMB = 4;
+
+  required int32 totalAvailableDiskSlots = 5;
+  required int32 totalAvailableCpuCoreSlots = 6;
+  required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+    MEMORY = 1;
+    DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+    required QueryIdProto queryId = 1;
+    required ResourceRequestPriority resourceRequestPriority = 2;
+
+    required int32 numContainers = 3;
+
+    required int32 maxMemoryMBPerContainer = 4;
+    required int32 minMemoryMBPerContainer = 5;
+
+    required float maxDiskSlotPerContainer = 6;
+    required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+    required WorkerConnectionInfoProto connectionInfo = 1;
+    required int32 memoryMB = 2 ;
+    required float diskSlots = 3;
+}
+
+message WorkerResourcesRequest {
+    repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+    required ExecutionBlockIdProto executionBlockId = 1;
+    repeated TajoContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+    required TajoContainerIdProto containerId = 1;
+    required WorkerConnectionInfoProto connectionInfo = 2;
+
+    required int32 allocatedMemoryMB = 3;
+    required float allocatedDiskSlots = 4;
+}
+
+message WorkerResourceAllocationResponse {
+    required QueryIdProto queryId = 1;
+    repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service QueryCoordinatorProtocolService {
+  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b2db46a..40aeab7 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
-import "TajoMasterProtocol.proto";
+import "QueryCoordinatorProtocol.proto";
 import "ContainerProtocol.proto";
 import "tajo_protos.proto";
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
deleted file mode 100644
index bc73596..0000000
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//TajoWorker -> TajoMaster protocol
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "TajoMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-import "ContainerProtocol.proto";
-
-package hadoop.yarn;
-
-message ServerStatusProto {
-    message System {
-        required int32 availableProcessors = 1;
-        required int32 freeMemoryMB = 2;
-        required int32 maxMemoryMB = 3;
-        required int32 totalMemoryMB = 4;
-    }
-    message Disk {
-        required string absolutePath = 1;
-        required int64 totalSpace = 2;
-        required int64 freeSpace = 3;
-        required int64 usableSpace = 4;
-    }
-
-    message JvmHeap {
-        required int64 maxHeap = 1;
-        required int64 totalHeap = 2;
-        required int64 freeHeap = 3;
-    }
-
-    required System system = 1;
-    required float diskSlots = 2;
-    required int32 memoryResourceMB = 3;
-    repeated Disk disk = 4;
-    required int32 runningTaskNum = 5;
-    required JvmHeap jvmHeap = 6;
-    required BoolProto queryMasterMode = 7;
-    required BoolProto taskRunnerMode = 8;
-}
-
-message TajoHeartbeat {
-  required WorkerConnectionInfoProto connectionInfo = 1;
-  optional QueryIdProto queryId = 2;
-  optional QueryState state = 3;
-  optional TableDescProto resultDesc = 4;
-  optional string statusMessage = 5;
-  optional float queryProgress = 6;
-  optional int64 queryFinishTime = 7;
-}
-
-message TajoHeartbeatResponse {
-  message ResponseCommand {
-      required string command = 1;
-      repeated string params = 2;
-  }
-  required BoolProto heartbeatResult = 1;
-  required ClusterResourceSummary clusterResourceSummary = 2;
-  optional ResponseCommand responseCommand = 3;
-}
-
-message ClusterResourceSummary {
-  required int32 numWorkers = 1;
-  required int32 totalDiskSlots = 2;
-  required int32 totalCpuCoreSlots = 3;
-  required int32 totalMemoryMB = 4;
-
-  required int32 totalAvailableDiskSlots = 5;
-  required int32 totalAvailableCpuCoreSlots = 6;
-  required int32 totalAvailableMemoryMB = 7;
-}
-
-enum ResourceRequestPriority {
-    MEMORY = 1;
-    DISK = 2;
-}
-
-message WorkerResourceAllocationRequest {
-    required QueryIdProto queryId = 1;
-    required ResourceRequestPriority resourceRequestPriority = 2;
-
-    required int32 numContainers = 3;
-
-    required int32 maxMemoryMBPerContainer = 4;
-    required int32 minMemoryMBPerContainer = 5;
-
-    required float maxDiskSlotPerContainer = 6;
-    required float minDiskSlotPerContainer = 7;
-}
-
-message WorkerResourceProto {
-    required WorkerConnectionInfoProto connectionInfo = 1;
-    required int32 memoryMB = 2 ;
-    required float diskSlots = 3;
-}
-
-message WorkerResourcesRequest {
-    repeated WorkerResourceProto workerResources = 1;
-}
-
-message WorkerResourceReleaseRequest {
-    required ExecutionBlockIdProto executionBlockId = 1;
-    repeated TajoContainerIdProto containerIds = 2;
-}
-
-message WorkerAllocatedResource {
-    required TajoContainerIdProto containerId = 1;
-    required WorkerConnectionInfoProto connectionInfo = 2;
-
-    required int32 allocatedMemoryMB = 3;
-    required float allocatedDiskSlots = 4;
-}
-
-message WorkerResourceAllocationResponse {
-    required QueryIdProto queryId = 1;
-    repeated WorkerAllocatedResource workerAllocatedResource = 2;
-}
-
-service TajoMasterProtocolService {
-  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
-  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
-  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
-  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 00186d7..0defb3c 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -25,7 +25,7 @@
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
 <%@ page import="org.apache.tajo.ha.HAService" %>
 <%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
 <%@ page import="org.apache.tajo.util.NetUtils" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4d8e5e6..85f7176 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.StringUtils" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0786912..e548b81 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -43,8 +43,8 @@ import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.*;
 import org.apache.tajo.querymaster.Query;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index b8fbd67..a013d0b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -19,12 +19,11 @@
 package org.apache.tajo.master.rm;
 
 import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;


[5/8] tajo git commit: TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)

Posted by ji...@apache.org.
TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)

Closes #339


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/50a8a663
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/50a8a663
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/50a8a663

Branch: refs/heads/index_support
Commit: 50a8a663c2c95f14ca59f3b01ffd79b2578f7f09
Parents: 533601e
Author: jhkim <jh...@apache.org>
Authored: Fri Jan 9 20:07:36 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Jan 9 20:07:36 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +-
 .../tajo/master/event/StageEventType.java       |   3 +-
 .../master/event/StageShuffleReportEvent.java   |  38 ++++
 .../java/org/apache/tajo/querymaster/Query.java |   2 +-
 .../querymaster/QueryMasterManagerService.java  |   2 +-
 .../java/org/apache/tajo/querymaster/Stage.java | 206 +++++++++++++------
 .../org/apache/tajo/querymaster/StageState.java |   1 +
 tajo-dist/pom.xml                               |  28 +++
 9 files changed, 220 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0df49b1..4e38f78 100644
--- a/CHANGES
+++ b/CHANGES
@@ -156,6 +156,8 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1251: Query is hanging occasionally by shuffle report. (jinho)
+
     TAJO-1287: Repeated using of the same order by key in multiple 
     window clauses should be supported. (Keuntae Park)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index ab11ddd..74a9271 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -188,7 +188,8 @@ public class TajoConf extends Configuration {
     /** how many launching TaskRunners in parallel */
     YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512, Validators.min("64")),
     YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1),
-    YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", 16),
+    YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num",
+        Runtime.getRuntime().availableProcessors() * 2),
     YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8),
 
     // Query Configuration

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
index fa808d4..763d426 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
@@ -34,7 +34,8 @@ public enum StageEventType {
   SQ_TASK_COMPLETED,
   SQ_FAILED,
 
-  // Producer: Completed
+  // Producer: Stage
+  SQ_SHUFFLE_REPORT,
   SQ_STAGE_COMPLETED,
 
   // Producer: Any component

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
new file mode 100644
index 0000000..924fb59
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+/**
+ * Event Class: From {@link org.apache.tajo.querymaster.QueryMasterManagerService} to Stage
+ */
+public class StageShuffleReportEvent extends StageEvent {
+  private TajoWorkerProtocol.ExecutionBlockReport report;
+
+  public StageShuffleReportEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) {
+    super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT);
+    this.report = report;
+  }
+
+  public TajoWorkerProtocol.ExecutionBlockReport getReport() {
+    return report;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 2932694..060e620 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -660,7 +660,6 @@ public class Query implements EventHandler<QueryEvent> {
         if (castEvent.getState() == StageState.SUCCEEDED &&  // latest stage succeeded
             query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
             hasNext(query)) {                                   // there remains at least one stage.
-          query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
           executeNextBlock(query);
         } else { // if a query is completed due to finished, kill, failure, or error
           query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
@@ -692,6 +691,7 @@ public class Query implements EventHandler<QueryEvent> {
     public void transition(Query query, QueryEvent event) {
       synchronized (query.stages) {
         for (Stage stage : query.stages.values()) {
+          stage.stopFinalization();
           query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 4a91326..85cc553 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -220,7 +220,7 @@ public class QueryMasterManagerService extends CompositeService
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
     if (queryMasterTask != null) {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
-      queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
+      queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
     }
     done.run(TajoWorker.TRUE_PROTO);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 34c58d4..1ea7051 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -50,27 +50,30 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.*;
+import org.apache.tajo.master.LaunchTaskRunnersEvent;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.FileStorageManager;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.util.history.StageHistory;
+import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -102,6 +105,8 @@ public class Stage implements EventHandler<StageEvent> {
 
   private long startTime;
   private long finishTime;
+  private volatile long lastContactTime;
+  private Thread timeoutChecker;
 
   volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
   volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
@@ -114,12 +119,13 @@ public class Stage implements EventHandler<StageEvent> {
   private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
       new AllocatedContainersCancelTransition();
   private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
+  private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new StageFinalizeTransition();
   private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
 
   protected static final StateMachineFactory<Stage, StageState,
       StageEventType, StageEvent> stateMachineFactory =
-      new StateMachineFactory <Stage, StageState,
-          StageEventType, StageEvent> (StageState.NEW)
+      new StateMachineFactory<Stage, StageState,
+          StageEventType, StageEvent>(StageState.NEW)
 
           // Transitions from NEW state
           .addTransition(StageState.NEW,
@@ -155,6 +161,9 @@ public class Stage implements EventHandler<StageEvent> {
           .addTransition(StageState.RUNNING, StageState.RUNNING,
               StageEventType.SQ_TASK_COMPLETED,
               TASK_COMPLETED_TRANSITION)
+          .addTransition(StageState.RUNNING, StageState.FINALIZING,
+              StageEventType.SQ_SHUFFLE_REPORT,
+              STAGE_FINALIZE_TRANSITION)
           .addTransition(StageState.RUNNING,
               EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
               StageEventType.SQ_STAGE_COMPLETED,
@@ -198,6 +207,24 @@ public class Stage implements EventHandler<StageEvent> {
               StageEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
+              // Transitions from FINALIZING state
+          .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+              StageEventType.SQ_SHUFFLE_REPORT,
+              STAGE_FINALIZE_TRANSITION)
+          .addTransition(StageState.FINALIZING,
+              EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
+              StageEventType.SQ_STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
+          .addTransition(StageState.FINALIZING, StageState.FINALIZING,
+              StageEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(StageState.FINALIZING, StageState.ERROR,
+              StageEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+              // Ignore-able Transition
+          .addTransition(StageState.FINALIZING, StageState.KILLED,
+              StageEventType.SQ_KILL)
+
               // Transitions from SUCCEEDED state
           .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
               StageEventType.SQ_CONTAINER_ALLOCATED,
@@ -273,14 +300,14 @@ public class Stage implements EventHandler<StageEvent> {
   private final Lock writeLock;
 
   private int totalScheduledObjectsCount;
-  private int succeededObjectCount = 0;
   private int completedTaskCount = 0;
-  private int succeededTaskCount = 0;
+  private int succeededObjectCount = 0;
   private int killedObjectCount = 0;
   private int failedObjectCount = 0;
   private TaskSchedulerContext schedulerContext;
-  private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
-  private AtomicInteger completeReportReceived = new AtomicInteger(0);
+  private List<IntermediateEntry> hashShuffleIntermediateEntries = Lists.newArrayList();
+  private AtomicInteger completedShuffleTasks = new AtomicInteger(0);
+  private AtomicBoolean stopShuffleReceiver = new AtomicBoolean();
   private StageHistory finalStageHistory;
 
   public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
@@ -465,10 +492,16 @@ public class Stage implements EventHandler<StageEvent> {
   }
 
   /**
-   * It finalizes this stage. It is only invoked when the stage is succeeded.
+   * It finalizes this stage. It is only invoked when the stage is finalizing.
    */
-  public void complete() {
+  public void finalizeStage() {
     cleanup();
+  }
+
+  /**
+   * It complete this stage. It is only invoked when the stage is succeeded.
+   */
+  public void complete() {
     finalizeStats();
     setFinishTime();
     eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
@@ -652,7 +685,7 @@ public class Stage implements EventHandler<StageEvent> {
   }
 
   private void releaseContainers() {
-    // If there are still live TaskRunners, try to kill the containers.
+    // If there are still live TaskRunners, try to kill the containers. and send the shuffle report request
     eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
   }
 
@@ -684,6 +717,7 @@ public class Stage implements EventHandler<StageEvent> {
 
   @Override
   public void handle(StageEvent event) {
+    lastContactTime = System.currentTimeMillis();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
           + getSynchronizedState());
@@ -751,6 +785,7 @@ public class Stage implements EventHandler<StageEvent> {
                             LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
 
                             if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+                              stage.finalizeStage();
                               stage.complete();
                             } else {
                               if(stage.getSynchronizedState() == StageState.INITED) {
@@ -1192,16 +1227,19 @@ public class Stage implements EventHandler<StageEvent> {
           stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
         }
 
-        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
-            stage.getId(),
-            stage.getTotalScheduledObjectsCount(),
-            stage.succeededObjectCount,
-            stage.killedObjectCount,
-            stage.failedObjectCount));
-
-        if (stage.totalScheduledObjectsCount ==
-            stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) {
-          stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+        if (stage.totalScheduledObjectsCount == stage.completedTaskCount) {
+          if (stage.succeededObjectCount == stage.completedTaskCount) {
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
+          } else {
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+          }
+        } else {
+          LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
+              stage.getId(),
+              stage.totalScheduledObjectsCount,
+              stage.succeededObjectCount,
+              stage.killedObjectCount,
+              stage.failedObjectCount));
         }
       }
     }
@@ -1244,48 +1282,94 @@ public class Stage implements EventHandler<StageEvent> {
     return hashShuffleIntermediateEntries;
   }
 
-  protected void waitingIntermediateReport() {
-    LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
-    synchronized(completeReportReceived) {
-      long startTime = System.currentTimeMillis();
-      while (true) {
-        if (completeReportReceived.get() >= tasks.size()) {
-          LOG.info(getId() + ", completed waiting IntermediateReport");
-          return;
-        } else {
-          try {
-            completeReportReceived.wait(10 * 1000);
-          } catch (InterruptedException e) {
+  protected void stopFinalization() {
+    stopShuffleReceiver.set(true);
+  }
+
+  private static class StageFinalizeTransition implements SingleArcTransition<Stage, StageEvent> {
+
+    @Override
+    public void transition(final Stage stage, StageEvent event) {
+      //If a shuffle report are failed, remaining reports will ignore
+      if (stage.stopShuffleReceiver.get()) {
+        return;
+      }
+
+      stage.lastContactTime = System.currentTimeMillis();
+      try {
+        if (event instanceof StageShuffleReportEvent) {
+
+          StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event;
+          TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport();
+
+          if (!report.getReportSuccess()) {
+            stage.stopFinalization();
+            LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage());
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
           }
-          long elapsedTime = System.currentTimeMillis() - startTime;
-          if (elapsedTime >= 120 * 1000) {
-            LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
-            abort(StageState.FAILED);
-            return;
+
+          stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks());
+          if (report.getIntermediateEntriesCount() > 0) {
+            for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) {
+              stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+            }
           }
-        }
-      }
-    }
-  }
 
-  public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
-    LOG.info(getId() + ", receiveExecutionBlockReport:" +  report.getSucceededTasks());
-    if (!report.getReportSuccess()) {
-      LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
-      abort(StageState.FAILED);
-      return;
-    }
-    if (report.getIntermediateEntriesCount() > 0) {
-      synchronized (hashShuffleIntermediateEntries) {
-        for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
-          hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+          if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) {
+            LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get());
+            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
+            if (stage.timeoutChecker != null) {
+              stage.stopFinalization();
+              synchronized (stage.timeoutChecker){
+                stage.timeoutChecker.notifyAll();
+              }
+            }
+          } else {
+            LOG.info(stage.getId() + ", Received shuffle report: " +
+                stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+          }
+
+        } else {
+          LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)",
+              stage.getId().toString(),
+              stage.totalScheduledObjectsCount,
+              stage.succeededObjectCount,
+              stage.killedObjectCount));
+          stage.finalizeStage();
+          LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount);
+
+          /* FIXME implement timeout handler of stage and task */
+          if (stage.timeoutChecker != null) {
+            stage.timeoutChecker = new Thread(new Runnable() {
+              @Override
+              public void run() {
+                while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) {
+                  long elapsedTime = System.currentTimeMillis() - stage.lastContactTime;
+                  if (elapsedTime > 120 * 1000) {
+                    stage.stopFinalization();
+                    LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime
+                        + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+                    stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
+                  }
+                  synchronized (this) {
+                    try {
+                      this.wait(1 * 1000);
+                    } catch (InterruptedException e) {
+                    }
+                  }
+                }
+              }
+            });
+            stage.timeoutChecker.start();
+          }
         }
+      } catch (Throwable t) {
+        LOG.error(t.getMessage(), t);
+        stage.stopFinalization();
+        stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage()));
+        stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
       }
     }
-    synchronized(completeReportReceived) {
-      completeReportReceived.addAndGet(report.getSucceededTasks());
-      completeReportReceived.notifyAll();
-    }
   }
 
   private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
index 2fd62be..2d68332 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java
@@ -22,6 +22,7 @@ public enum StageState {
   NEW,
   INITED,
   RUNNING,
+  FINALIZING,
   SUCCEEDED,
   FAILED,
   KILL_WAIT,

http://git-wip-us.apache.org/repos/asf/tajo/blob/50a8a663/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d469ba9..3df2681 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -207,6 +207,34 @@
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <exclusions>
+            <exclusion>
+              <groupId>commons-el</groupId>
+              <artifactId>commons-el</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>tomcat</groupId>
+              <artifactId>jasper-runtime</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>tomcat</groupId>
+              <artifactId>jasper-compiler</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>jsp-2.1-jetty</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
       <build>
         <plugins>
           <plugin>


[8/8] tajo git commit: Merge branch 'master' of https://github.com/apache/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of https://github.com/apache/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/767b9a4b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/767b9a4b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/767b9a4b

Branch: refs/heads/index_support
Commit: 767b9a4b7894f0545fca18569865299130ce190a
Parents: e04c65f 807868b
Author: SonJihoon <ji...@SonJihoonui-MacBook-Pro.local>
Authored: Fri Jan 9 23:29:49 2015 +0900
Committer: SonJihoon <ji...@SonJihoonui-MacBook-Pro.local>
Committed: Fri Jan 9 23:29:49 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  16 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +-
 tajo-core/pom.xml                               |   2 +-
 .../engine/planner/physical/WindowAggExec.java  |  11 +-
 .../tajo/master/QueryCoordinatorService.java    | 160 ++++++++++
 .../org/apache/tajo/master/QueryInProgress.java | 228 ++++++++++++++
 .../org/apache/tajo/master/QueryJobManager.java | 311 ------------------
 .../org/apache/tajo/master/QueryManager.java    | 315 +++++++++++++++++++
 .../apache/tajo/master/TajoContainerProxy.java  |  12 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  16 +-
 .../tajo/master/TajoMasterClientService.java    |   9 +-
 .../apache/tajo/master/TajoMasterService.java   | 168 ----------
 .../tajo/master/event/StageEventType.java       |   3 +-
 .../master/event/StageShuffleReportEvent.java   |  38 +++
 .../apache/tajo/master/exec/QueryExecutor.java  |   4 +-
 .../tajo/master/rm/TajoResourceTracker.java     |  12 +-
 .../master/rm/TajoWorkerResourceManager.java    |  16 +-
 .../tajo/master/rm/WorkerResourceManager.java   |  19 +-
 .../apache/tajo/master/scheduler/Scheduler.java |   2 +-
 .../master/scheduler/SimpleFifoScheduler.java   |  11 +-
 .../java/org/apache/tajo/querymaster/Query.java |   2 +-
 .../tajo/querymaster/QueryInProgress.java       | 301 ------------------
 .../apache/tajo/querymaster/QueryJobEvent.java  |   5 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  97 ++----
 .../querymaster/QueryMasterManagerService.java  |   2 +-
 .../tajo/querymaster/QueryMasterTask.java       |  47 +--
 .../java/org/apache/tajo/querymaster/Stage.java | 223 ++++++++-----
 .../org/apache/tajo/querymaster/StageState.java |   1 +
 .../main/java/org/apache/tajo/util/JSPUtil.java |   2 +-
 .../tajo/worker/TajoResourceAllocator.java      |  37 ++-
 .../java/org/apache/tajo/worker/TajoWorker.java |  20 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  27 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  14 +-
 .../main/proto/QueryCoordinatorProtocol.proto   | 147 +++++++++
 .../main/proto/ResourceTrackerProtocol.proto    |   2 +-
 .../src/main/proto/TajoMasterProtocol.proto     | 148 ---------
 .../src/main/resources/webapps/admin/index.jsp  |   2 +-
 .../src/main/resources/webapps/admin/query.jsp  |   2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   2 +-
 .../tajo/engine/query/TestWindowQuery.java      |  29 ++
 .../tajo/master/rm/TestTajoResourceManager.java |   3 +-
 .../apache/tajo/querymaster/TestKillQuery.java  |   2 +-
 tajo-dist/pom.xml                               |  28 ++
 tajo-jdbc/pom.xml                               |   6 +-
 .../org/apache/tajo/jdbc/JdbcConnection.java    |   3 +-
 .../tajo/jdbc/util/QueryStringDecoder.java      | 139 ++++++++
 .../tajo/jdbc/util/TestQueryStringDecoder.java  |  94 ++++++
 .../java/org/apache/tajo/storage/CSVFile.java   |   4 +-
 .../java/org/apache/tajo/storage/RowFile.java   |   3 +-
 .../apache/tajo/storage/avro/AvroAppender.java  |   3 +-
 .../tajo/storage/parquet/ParquetAppender.java   |   3 +-
 .../sequencefile/SequenceFileAppender.java      |   4 +-
 .../tajo/storage/text/DelimitedTextFile.java    |   4 +-
 53 files changed, 1508 insertions(+), 1254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 742665a,fd52488..7a5e7b4
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@@ -58,11 -49,13 +55,10 @@@ import org.apache.tajo.plan.LogicalOpti
  import org.apache.tajo.plan.LogicalPlan;
  import org.apache.tajo.plan.LogicalPlanner;
  import org.apache.tajo.plan.logical.LogicalNode;
 -import org.apache.tajo.plan.logical.LogicalRootNode;
  import org.apache.tajo.plan.logical.NodeType;
  import org.apache.tajo.plan.logical.ScanNode;
 -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 -import org.apache.tajo.plan.util.PlannerUtil;
  import org.apache.tajo.plan.verifier.VerifyException;
- import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.RpcConnectionPool;
+ import org.apache.tajo.session.Session;
  import org.apache.tajo.storage.StorageManager;
  import org.apache.tajo.storage.StorageProperty;
  import org.apache.tajo.storage.StorageUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/767b9a4b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------


[4/8] tajo git commit: TAJO-1286: Remove netty dependency from tajo-jdbc

Posted by ji...@apache.org.
TAJO-1286: Remove netty dependency from tajo-jdbc

Closes #341


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/533601ea
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/533601ea
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/533601ea

Branch: refs/heads/index_support
Commit: 533601eac6a21251485576ba693635d1650b63a4
Parents: 03801a3
Author: Jihun Kang <ji...@apache.org>
Authored: Fri Jan 9 15:51:25 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Fri Jan 9 15:51:25 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 tajo-jdbc/pom.xml                               |   6 +-
 .../org/apache/tajo/jdbc/JdbcConnection.java    |   3 +-
 .../tajo/jdbc/util/QueryStringDecoder.java      | 139 +++++++++++++++++++
 .../tajo/jdbc/util/TestQueryStringDecoder.java  |  94 +++++++++++++
 5 files changed, 242 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 66bb299..0df49b1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1286: Remove netty dependency from tajo-jdbc. (jihun)
+
     TAJO-1282: Cleanup the relationship of QueryInProgress and 
     QueryJobManager. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml
index 15439c6..9847416 100644
--- a/tajo-jdbc/pom.xml
+++ b/tajo-jdbc/pom.xml
@@ -116,7 +116,11 @@
       <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-client</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index a76443d..f00dc25 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.jdbc;
 
 import com.google.protobuf.ServiceException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.TajoConstants;
@@ -27,7 +28,7 @@ import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.tajo.jdbc.util.QueryStringDecoder;
 
 import java.io.IOException;
 import java.net.URI;

http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java
new file mode 100644
index 0000000..9ec9340
--- /dev/null
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/util/QueryStringDecoder.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc.util;
+
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class QueryStringDecoder {
+
+  private final Charset charset;
+  private final String rawUri;
+  private String queries;
+  private Map<String, List<String>> params;
+  
+  public QueryStringDecoder(String rawUri) {
+    this(rawUri, Charset.defaultCharset());
+  }
+  
+  public QueryStringDecoder(String rawUri, Charset charset) {
+    this.rawUri = rawUri;
+    this.charset = charset;
+  }
+  
+  private void splitUri() {
+    if (rawUri != null && !rawUri.isEmpty()) {
+      int pathPos = rawUri.indexOf('?');
+      if (pathPos < 0) {
+        queries = "";
+      } else {
+        if ((pathPos + 1) > rawUri.length()) {
+          queries = "";
+        } else {
+          queries = rawUri.substring(pathPos + 1);
+        }
+      }
+    }
+  }
+  
+  protected void decodeParams() throws MalformedURLException, UnsupportedEncodingException {
+    params = new HashMap<String, List<String>>();
+    String queries = getQueries();
+    
+    if (queries != null && !queries.isEmpty()) {
+      char c = 0;
+      int startPos = 0;
+      String name = null, value = null;
+      
+      for (int index = 0; index < queries.length(); index++) {
+        c = queries.charAt(index);
+        if (c == '=') {
+          name = queries.substring(startPos, index);
+          if (name.isEmpty()) {
+            throw new MalformedURLException(rawUri + " is not a valid URL.");
+          }
+          name = decodeString(name);
+          startPos = index+1;
+        } else if (c == '&') {
+          if (name == null || name.isEmpty()) {
+            throw new MalformedURLException(rawUri + " is not a valid URL.");
+          }          
+          value = queries.substring(startPos, index);
+          if (value.isEmpty()) {
+            throw new MalformedURLException(rawUri + " is not a valid URL.");
+          }
+          value = decodeString(value);
+          addParameter(name, value);
+          startPos = index+1;
+        }
+      }
+      
+      if (startPos > 0 && name != null && !name.isEmpty()) {
+        value = queries.substring(startPos);
+        value = decodeString(value);
+        addParameter(name, value);
+      }
+    }
+  }
+  
+  protected String decodeString(String string) throws UnsupportedEncodingException {
+    String decoded = "";
+    
+    if (string != null && !string.isEmpty()) {
+      decoded = URLDecoder.decode(string, charset.name());
+    }
+    
+    return decoded;
+  }
+  
+  protected void addParameter(String name, String value) {
+    List<String> valueList = params.get(name);
+    
+    if (valueList == null) {
+      valueList = new ArrayList<String>();
+      params.put(name, valueList);
+    }
+    
+    valueList.add(value);
+  }
+  
+  public String getRawUri() {
+    return rawUri;
+  }
+  
+  public String getQueries() {
+    if (queries == null || queries.isEmpty()) {
+      splitUri();
+    }
+    return queries;
+  }
+  
+  public Map<String, List<String>> getParameters() throws MalformedURLException, UnsupportedEncodingException {
+    if (params == null || params.size() <= 0) {
+      decodeParams();
+    }
+    return params;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/533601ea/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java
new file mode 100644
index 0000000..31a09d5
--- /dev/null
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc.util;
+
+import java.net.MalformedURLException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+public class TestQueryStringDecoder {
+
+  @Test
+  public void testEmptyQuery() throws Exception {
+    QueryStringDecoder decoder = null;
+    String rawUriStr = "";
+    
+    rawUriStr = "http://127.0.0.1:26002/";
+    decoder = new QueryStringDecoder(rawUriStr);
+    assertThat(decoder.getQueries(), is(notNullValue()));
+    assertThat(decoder.getParameters(), is(notNullValue()));
+    assertThat(decoder.getParameters().size(), is(0));
+    
+    rawUriStr = "/test_path/test2?";
+    decoder = new QueryStringDecoder(rawUriStr);
+    assertThat(decoder.getQueries(), is(notNullValue()));
+    assertThat(decoder.getParameters(), is(notNullValue()));
+    assertThat(decoder.getParameters().size(), is(0));
+  }
+  
+  @Test
+  public void testSingleQueries() throws Exception {
+    QueryStringDecoder decoder = null;
+    String rawUriStr = "";
+    
+    rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&pid=4567";
+    decoder = new QueryStringDecoder(rawUriStr);
+    assertThat(decoder.getQueries(), is("qid=1234&tid=2345&pid=4567"));
+    assertThat(decoder.getParameters(), is(notNullValue()));
+    assertThat(decoder.getParameters().size(), is(3));
+    assertThat(decoder.getParameters().get("qid").get(0), is("1234"));
+    assertThat(decoder.getParameters().get("pid").get(0), is("4567"));
+    
+    rawUriStr = "http://127.0.0.1:26200/?tid=2345";
+    decoder = new QueryStringDecoder(rawUriStr);
+    assertThat(decoder.getQueries(), is("tid=2345"));
+    assertThat(decoder.getParameters(), is(notNullValue()));
+    assertThat(decoder.getParameters().size(), is(1));
+    assertThat(decoder.getParameters().get("tid").get(0), is("2345"));
+  }
+  
+  @Test
+  public void testMultipleQueries() throws Exception {
+    QueryStringDecoder decoder = null;
+    String rawUriStr = "";
+    
+    rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&pid=4567&tid=4890";
+    decoder = new QueryStringDecoder(rawUriStr);
+    assertThat(decoder.getQueries(), is("qid=1234&tid=2345&pid=4567&tid=4890"));
+    assertThat(decoder.getParameters(), is(notNullValue()));
+    assertThat(decoder.getParameters().size(), is(3));
+    assertThat(decoder.getParameters().get("tid").size(), is(2));
+    assertThat(decoder.getParameters().get("tid").get(0), is("2345"));
+    assertThat(decoder.getParameters().get("tid").get(1), is("4890"));
+  }
+  
+  @Test(expected=MalformedURLException.class)
+  public void testMalformedURI() throws Exception {
+    QueryStringDecoder decoder = null;
+    String rawUriStr = "";
+    
+    rawUriStr = "http://127.0.0.1:26200/?=1234&tid=&pid=4567";
+    decoder = new QueryStringDecoder(rawUriStr);
+    assertThat(decoder.getQueries(), is("=1234&tid=&pid=4567"));
+    decoder.getParameters();
+  }
+}