You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:14 UTC

[5/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
new file mode 100644
index 0000000..b2c129f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
+  private QueryInfo queryInfo;
+
+  public QueryJobEvent(Type type, QueryInfo queryInfo) {
+    super(type);
+
+    this.queryInfo = queryInfo;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public enum Type {
+    QUERY_JOB_START,
+    QUERY_JOB_HEARTBEAT,
+    QUERY_JOB_FINISH,
+    QUERY_MASTER_START,
+    QUERY_MASTER_STOP
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
new file mode 100644
index 0000000..e4d83af
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryJobManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
+
+  // TajoMaster Context
+  private final TajoMaster.MasterContext masterContext;
+
+  private AsyncDispatcher dispatcher;
+
+  private Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
+
+  private Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
+
+  public QueryJobManager(final TajoMaster.MasterContext masterContext) {
+    super(QueryJobManager.class.getName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      this.dispatcher = new AsyncDispatcher();
+      addService(this.dispatcher);
+
+      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+    } catch (Exception e) {
+      catchException(null, e);
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    synchronized(runningQueries) {
+      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+        eachQueryInProgress.stop();
+      }
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public QueryInfo createNewQueryJob(String sql, LogicalRootNode plan) throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryId, sql, plan);
+
+    synchronized(runningQueries) {
+      runningQueries.put(queryId, queryInProgress);
+    }
+
+    addService(queryInProgress);
+    queryInProgress.init(getConfig());
+    queryInProgress.start();
+
+    queryInProgress.startQueryMaster();
+
+    return queryInProgress.getQueryInfo();
+  }
+
+  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+    @Override
+    public void handle(QueryJobEvent event) {
+      QueryInProgress queryInProgress = null;
+      synchronized(runningQueries) {
+        queryInProgress = runningQueries.get(event.getQueryInfo().getQueryId());
+        if(queryInProgress == null) {
+          LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+          return;
+        }
+      }
+      queryInProgress.getEventHandler().handle(event);
+    }
+  }
+
+  public QueryInProgress getQueryInProgress(QueryId queryId) {
+    synchronized(runningQueries) {
+      return runningQueries.get(queryId);
+    }
+  }
+
+  public void stopQuery(QueryId queryId) {
+    LOG.info("====>Stop QueryInProgress:" + queryId);
+    QueryInProgress queryInProgress = getQueryInProgress(queryId);
+    if(queryInProgress != null) {
+      queryInProgress.stop();
+      synchronized(runningQueries) {
+        runningQueries.remove(queryId);
+        finishedQueries.put(queryId, queryInProgress);
+      }
+    } else {
+      LOG.warn("====> No QueryInProgress while query stopping: " + queryId);
+    }
+  }
+
+  private void catchException(QueryId queryId, Exception e) {
+    LOG.error(e.getMessage(), e);
+    QueryInProgress queryInProgress = runningQueries.get(queryId);
+    queryInProgress.catchException(e);
+  }
+
+  public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+      TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryInProgress == null) {
+      return null;
+    }
+
+    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+    return null;
+  }
+
+  private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryHeartbeat.getTajoWorkerHost() != null) {
+      WorkerResource queryMasterResource = new WorkerResource();
+      queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
+      queryMasterResource.setPorts(new int[]{queryHeartbeat.getTajoWorkerPort(), queryHeartbeat.getTajoWorkerClientPort()});
+
+      queryInfo.setQueryMasterResource(queryMasterResource);
+    }
+    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+    queryInfo.setQueryState(queryHeartbeat.getState());
+
+    return queryInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index b66ef68..50ec5be 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -18,142 +18,82 @@
 
 package org.apache.tajo.master.querymaster;
 
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.planner.global.GlobalOptimizer;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.master.*;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.rm.RMContainerAllocator;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
-import org.apache.tajo.rpc.ProtoBlockingRpcClient;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.GlobalPlanner;
+import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.TajoWorker;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-// TODO - when exception, send error status to QueryMasterManager
+// TODO - when exception, send error status to QueryJobManager
 public class QueryMaster extends CompositeService implements EventHandler {
   private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
-  private static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
-  private static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
   private static int QUERY_SESSION_TIMEOUT = 60 * 1000;  //60 sec
 
-  // AppMaster Common
-  private final long appSubmitTime;
   private Clock clock;
 
-  // For Query
-  private final QueryId queryId;
-  private QueryContext queryContext;
-  private Query query;
-  private TajoProtos.QueryState state = TajoProtos.QueryState.QUERY_NOT_ASSIGNED;
-  private String statusMessage;
-  private MasterPlan masterPlan;
-
-  private AsyncDispatcher dispatcher;
-  private RMContainerAllocator rmAllocator;
+  private TajoAsyncDispatcher dispatcher;
 
-  //service handler for QueryMasterManager, Worker
-  private QueryMasterService queryMasterService;
-  private QueryMasterClientService queryMasterClientService;
-
-  private TaskRunnerLauncher taskRunnerLauncher;
   private GlobalPlanner globalPlanner;
+
   private GlobalOptimizer globalOptimizer;
 
-  private boolean isCreateTableStmt;
+//  private boolean isCreateTableStmt;
   private StorageManager storageManager;
-  private Path outputPath;
+
   private QueryConf queryConf;
-  private ApplicationAttemptId appAttemptId;
-  private ApplicationId appId;
-  private ProtoBlockingRpcClient queryMasterManagerClient;
-  private QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface queryMasterManagerService;
 
-  private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+  private Map<QueryId, QueryMasterTask> queryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
 
-  private String queryMasterManagerAddress;
+  private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
 
-  private YarnRPC yarnRPC;
+  private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
 
-  private YarnClient yarnClient;
+  private QueryMasterContext queryMasterContext;
 
-  private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
+  private QueryHeartbeatThread queryHeartbeatThread;
 
-  public QueryMaster(final QueryId queryId, final long appSubmitTime, String queryMasterManagerAddress) {
-    super(QueryMaster.class.getName());
+  private TajoWorker.WorkerContext workerContext;
 
-    this.queryId = queryId;
-    this.appSubmitTime = appSubmitTime;
-    this.appId = queryId.getApplicationId();
-    this.queryMasterManagerAddress = queryMasterManagerAddress;
-
-    LOG.info("Created Query Master for " + queryId);
+  public QueryMaster(TajoWorker.WorkerContext workerContext) {
+    super(QueryMaster.class.getName());
+    this.workerContext = workerContext;
   }
 
   public void init(Configuration conf) {
+    LOG.info("QueryMaster init");
     try {
       queryConf = new QueryConf(conf);
-      QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
-      queryContext = new QueryContext(queryConf);
-      yarnRPC = YarnRPC.create(queryContext.getConf());
-      connectYarnClient();
+      queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
 
-      LOG.info("Init QueryMasterManagerClient connection to:" + queryMasterManagerAddress);
-      InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterManagerAddress);
-      queryMasterManagerClient = new ProtoBlockingRpcClient(QueryMasterManagerProtocol.class, addr);
-      queryMasterManagerService = queryMasterManagerClient.getStub();
+      QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+      queryMasterContext = new QueryMasterContext(queryConf);
 
       clock = new SystemClock();
 
-      this.dispatcher = new AsyncDispatcher();
+      this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
       addIfService(dispatcher);
 
       this.storageManager = new StorageManager(queryConf);
@@ -161,23 +101,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
       globalPlanner = new GlobalPlanner(queryConf, storageManager, dispatcher.getEventHandler());
       globalOptimizer = new GlobalOptimizer();
 
-      queryMasterService = new QueryMasterService();
-      addIfService(queryMasterService);
-
-      queryMasterClientService = new QueryMasterClientService(queryContext);
-      addIfService(queryMasterClientService);
-
-      initStagingDir();
-
-      dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
-      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
-      dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
-      dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+      dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
 
-      clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
-
-      clientSessionTimeoutCheckThread.start();
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
       throw new RuntimeException(t);
@@ -185,364 +110,88 @@ public class QueryMaster extends CompositeService implements EventHandler {
     super.init(conf);
   }
 
-  class ClientSessionTimeoutCheckThread extends Thread {
-    public void run() {
-      LOG.info("ClientSessionTimeoutCheckThread started");
-      while(true) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          break;
-        }
-        try {
-          long lastHeartbeat = queryContext.getLastClientHeartbeat();
-          long time = System.currentTimeMillis() - lastHeartbeat;
-          if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
-            LOG.warn("Query " + queryId + " stopped cause query sesstion timeout: " + time + " ms");
-            QueryMaster.this.stop();
-          }
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }
-  }
-
-  class QueryHeartbeatThread extends Thread {
-    public QueryHeartbeatThread() {
-      super("QueryHeartbeatThread");
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Start QueryMaster heartbeat thread");
-      while(queryMasterManagerClient.isConnected()) {
-        QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat =
-            QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
-                .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
-                .setQueryMasterPort(queryMasterService.bindAddr.getPort())
-                .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
-                .setState(state)
-                .setQueryId(queryId.getProto())
-                .build();
-
-        try {
-          QueryMasterManagerProtocol.QueryHeartbeatResponse response =
-              queryMasterManagerService.queryHeartbeat(null, queryHeartbeat);
-          if(response.getResponseCommand() != null) {
-            if("executeQuery".equals(response.getResponseCommand().getCommand())) {
-              appAttemptId = TajoIdUtils.toApplicationAttemptId(response.getResponseCommand().getParams(0));
-              startQuery(response.getResponseCommand().getParams(1),
-                  response.getResponseCommand().getParams(2));
-            }
-          }
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-        try {
-          Thread.sleep(2000);
-        } catch (InterruptedException e) {
-          break;
-        }
-      }
-      LOG.info("QueryMaster heartbeat thread stopped");
-    }
-  }
-
-  // TODO blocking/nonblocking ???
-  class QueryMasterService extends AbstractService implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
-    private ProtoAsyncRpcServer rpcServer;
-    private InetSocketAddress bindAddr;
-    private String addr;
-    private QueryHeartbeatThread queryHeartbeatThread;
-
-    public QueryMasterService() {
-      super(QueryMasterService.class.getName());
-
-      // Setup RPC server
-      try {
-        InetSocketAddress initIsa =
-                new InetSocketAddress(InetAddress.getLocalHost(), 0);
-        if (initIsa.getAddress() == null) {
-          throw new IllegalArgumentException("Failed resolve of " + initIsa);
-        }
-
-        this.rpcServer = new ProtoAsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
-        this.rpcServer.start();
-
-        this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
-        this.addr = NetUtils.normalizeInetSocketAddress(this.bindAddr);
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-      }
-      queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
-      LOG.info("QueryMasterService startup");
-    }
-
-    @Override
-    public void init(Configuration conf) {
-      super.init(conf);
-    }
-
-    @Override
-    public void start() {
-      try {
-        queryHeartbeatThread = new QueryHeartbeatThread();
-        queryHeartbeatThread.start();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        // TODO - set query status failed and stop QueryMaster
-      }
-      super.start();
-    }
-
-    @Override
-    public void stop() {
-      if(rpcServer != null) {
-        rpcServer.shutdown();
-      }
-      if(queryHeartbeatThread != null) {
-        queryHeartbeatThread.interrupt();
-      }
-      if(yarnClient != null) {
-        yarnClient.stop();
-      }
-      if(clientSessionTimeoutCheckThread != null) {
-        clientSessionTimeoutCheckThread.interrupt();
-      }
-      super.stop();
-      LOG.info("QueryMasterService stopped");
-    }
-
-    @Override
-    public void getTask(RpcController controller, YarnProtos.ContainerIdProto request,
-                        RpcCallback<QueryMasterProtocol.QueryUnitRequestProto> done) {
-      queryContext.getEventHandler().handle(new TaskRequestEvent(new ContainerIdPBImpl(request), done));
-    }
-
-    @Override
-    public void statusUpdate(RpcController controller, QueryMasterProtocol.TaskStatusProto request,
-                             RpcCallback<PrimitiveProtos.BoolProto> done) {
-      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
-      queryContext.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId, request));
-      done.run(TRUE_PROTO);
-    }
-
-    @Override
-    public void ping(RpcController controller,
-                     TajoIdProtos.QueryUnitAttemptIdProto attemptIdProto,
-                     RpcCallback<PrimitiveProtos.BoolProto> done) {
-      // TODO - to be completed
-      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-      done.run(TRUE_PROTO);
-    }
-
-    @Override
-    public void fatalError(RpcController controller, QueryMasterProtocol.TaskFatalErrorReport report,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-      queryContext.getEventHandler().handle(new TaskFatalErrorEvent(report));
-      done.run(TRUE_PROTO);
-    }
+  @Override
+  public void start() {
+    LOG.info("====>QueryMaster start");
 
-    @Override
-    public void done(RpcController controller, QueryMasterProtocol.TaskCompletionReport report,
-                     RpcCallback<PrimitiveProtos.BoolProto> done) {
-      queryContext.getEventHandler().handle(new TaskCompletionEvent(report));
-      done.run(TRUE_PROTO);
-    }
+    queryHeartbeatThread = new QueryHeartbeatThread();
+    queryHeartbeatThread.start();
 
-    @Override
-    public void executeQuery(RpcController controller, PrimitiveProtos.StringProto request,
-                             RpcCallback<PrimitiveProtos.BoolProto> done) {
-    }
-  }
+    clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
+    clientSessionTimeoutCheckThread.start();
 
-  public void start() {
     super.start();
   }
 
+  @Override
   public void stop() {
-    LOG.info("unregisterApplicationMaster");
-    if(rmAllocator != null) {
-      try {
-        FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
-        if (query != null) {
-          TajoProtos.QueryState state = query.getState();
-          if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            status = FinalApplicationStatus.SUCCEEDED;
-          } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
-            status = FinalApplicationStatus.FAILED;
-          } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
-            status = FinalApplicationStatus.FAILED;
-          }
-        }
-        this.rmAllocator.unregisterApplicationMaster(status, "tajo query finished", null);
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
+    synchronized(queryMasterStop) {
+      if(queryMasterStop.get()) {
+         return;
       }
-    }
 
-    // TODO - release opened resource
-    if(this.queryMasterManagerClient != null) {
-      reportQueryStatus();
-
-      queryMasterManagerClient.close();
+      queryMasterStop.set(true);
+      queryMasterStop.notifyAll();
     }
 
-    try {
-      FileSystem.closeAll();
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
+    if(queryHeartbeatThread != null) {
+      queryHeartbeatThread.interrupt();
     }
 
+    if(clientSessionTimeoutCheckThread != null) {
+      clientSessionTimeoutCheckThread.interrupt();
+    }
     super.stop();
 
-    synchronized(queryId) {
-      queryId.notifyAll();
+    LOG.info("QueryMaster stop");
+    if(!queryMasterContext.getWorkerContext().isStandbyMode()) {
+      queryMasterContext.getWorkerContext().stopWorker(true);
     }
   }
 
-  private void reportQueryStatus() {
-    //send query status heartbeat
-    QueryMasterManagerProtocol.QueryHeartbeat.Builder queryHeartbeatBuilder =
-        QueryMasterManagerProtocol.QueryHeartbeat.newBuilder()
-        .setQueryMasterHost(queryMasterService.bindAddr.getHostName())
-        .setQueryMasterPort(queryMasterService.bindAddr.getPort())
-        .setQueryMasterClientPort(queryMasterClientService.getBindAddr().getPort())
-        .setState(state)
-        .setQueryId(queryId.getProto());
-
-    if(statusMessage != null) {
-      queryHeartbeatBuilder.setStatusMessage(statusMessage);
-    }
+  public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
+    LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
     try {
-      queryMasterManagerService.queryHeartbeat(null, queryHeartbeatBuilder.build());
+      TajoMasterProtocol.TajoHeartbeat.Builder queryHeartbeatBuilder = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+          .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+          .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+          .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+          .setState(state)
+          .setQueryId(queryId.getProto());
+
+      workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeatBuilder.build(), NullCallback.get());
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
   }
 
-  private void connectYarnClient() {
-    this.yarnClient = new YarnClientImpl();
-    this.yarnClient.init(queryConf);
-    this.yarnClient.start();
-  }
-
   protected void addIfService(Object object) {
     if (object instanceof Service) {
       addService((Service) object);
     }
   }
 
-  public synchronized void startQuery(String queryStr, String planJSON) {
-    LOG.info("Query Start:" + queryStr);
-    LOG.info("Plan JSON:" + planJSON);
-    if(query != null) {
-      LOG.warn("Query already started");
-      return;
-    }
-
-    try {
-      LogicalRootNode logicalNodeRoot = (LogicalRootNode) CoreGsonHelper.fromJson(planJSON, LogicalNode.class);
-      LogicalNode[] scanNodes = PlannerUtil.findAllNodes(logicalNodeRoot, NodeType.SCAN);
-      if(scanNodes != null) {
-        for(LogicalNode eachScanNode: scanNodes) {
-          ScanNode scanNode = (ScanNode)eachScanNode;
-          tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
-        }
-      }
-      MasterPlan globalPlan = globalPlanner.build(queryId, logicalNodeRoot);
-      this.masterPlan = globalOptimizer.optimize(globalPlan);
-
-      taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
-      addIfService(taskRunnerLauncher);
-      dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
-
-      ((TaskRunnerLauncherImpl)taskRunnerLauncher).init(queryConf);
-      ((TaskRunnerLauncherImpl)taskRunnerLauncher).start();
-
-      rmAllocator = new RMContainerAllocator(queryContext);
-      addIfService(rmAllocator);
-      dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
-
-      rmAllocator.init(queryConf);
-      rmAllocator.start();
-
-      //TODO - synchronized with executeQuery logic
-      query = new Query(queryContext, queryId, clock, appSubmitTime,
-              "", dispatcher.getEventHandler(), masterPlan, storageManager);
-      dispatcher.register(QueryEventType.class, query);
-
-      dispatcher.getEventHandler().handle(new QueryEvent(queryId,
-          QueryEventType.INIT));
-      dispatcher.getEventHandler().handle(new QueryEvent(queryId,
-          QueryEventType.START));
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      //send FAIL query status
-      this.statusMessage = StringUtils.stringifyException(e);
-      this.state = TajoProtos.QueryState.QUERY_FAILED;
-    }
-  }
-
   @Override
   public void handle(Event event) {
     dispatcher.getEventHandler().handle(event);
   }
 
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-  private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
-    public void handle(SubQueryEvent event) {
-      SubQueryId id = event.getSubQueryId();
-      query.getSubQuery(id).handle(event);
-    }
+  public Query getQuery(QueryId queryId) {
+    return queryMasterTasks.get(queryId).getQuery();
   }
 
-  private class TaskEventDispatcher
-      implements EventHandler<TaskEvent> {
-    public void handle(TaskEvent event) {
-      QueryUnitId taskId = event.getTaskId();
-      QueryUnit task = query.getSubQuery(taskId.getSubQueryId()).
-          getQueryUnit(taskId);
-      task.handle(event);
-    }
-  }
-
-  private class TaskAttemptEventDispatcher
-      implements EventHandler<TaskAttemptEvent> {
-    public void handle(TaskAttemptEvent event) {
-      QueryUnitAttemptId attemptId = event.getTaskAttemptId();
-      SubQuery subQuery = query.getSubQuery(attemptId.getSubQueryId());
-      QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
-      QueryUnitAttempt attempt = task.getAttempt(attemptId);
-      attempt.handle(event);
-    }
-  }
-
-  private class TaskSchedulerDispatcher
-      implements EventHandler<TaskSchedulerEvent> {
-    public void handle(TaskSchedulerEvent event) {
-      SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
-      subQuery.getTaskScheduler().handle(event);
-    }
+  public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+    return queryMasterTasks.get(queryId);
   }
 
-  public QueryContext getContext() {
-    return this.queryContext;
+  public QueryMasterContext getContext() {
+    return this.queryMasterContext;
   }
 
-  public class QueryContext {
+  public class QueryMasterContext {
     private QueryConf conf;
-    public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
-    int minCapability;
-    int maxCapability;
-    int numCluster;
-    AtomicLong lastClientHeartbeat = new AtomicLong(-1);
 
-    public QueryContext(QueryConf conf) {
+    public QueryMasterContext(QueryConf conf) {
       this.conf = conf;
     }
 
@@ -550,15 +199,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return conf;
     }
 
-    public InetSocketAddress getQueryMasterServiceAddress() {
-      return queryMasterService.bindAddr;
-    }
-
-    public QueryMasterClientService getQueryMasterClientService() {
-      return queryMasterClientService;
-    }
-
-    public AsyncDispatcher getDispatcher() {
+    public TajoAsyncDispatcher getDispatcher() {
       return dispatcher;
     }
 
@@ -566,251 +207,133 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return clock;
     }
 
-    public Query getQuery() {
-      return query;
+    public StorageManager getStorageManager() {
+      return storageManager;
     }
 
-    public SubQuery getSubQuery(SubQueryId subQueryId) {
-      return query.getSubQuery(subQueryId);
+    public QueryMaster getQueryMaster() {
+      return QueryMaster.this;
     }
 
-    public QueryId getQueryId() {
-      return queryId;
+    public GlobalPlanner getGlobalPlanner() {
+      return globalPlanner;
     }
-
-    public ApplicationId getApplicationId() {
-      return appId;
+    public GlobalOptimizer getGlobalOptimizer() {
+      return globalOptimizer;
     }
 
-    public ApplicationAttemptId getApplicationAttemptId() {
-      return appAttemptId;
+    public TajoWorker.WorkerContext getWorkerContext() {
+      return workerContext;
     }
 
     public EventHandler getEventHandler() {
       return dispatcher.getEventHandler();
     }
 
-    public void addContainer(ContainerId cId, ContainerProxy container) {
-      containers.put(cId, container);
-    }
-
-    public void removeContainer(ContainerId cId) {
-      containers.remove(cId);
-    }
-
-    public boolean containsContainer(ContainerId cId) {
-      return containers.containsKey(cId);
-    }
-
-    public ContainerProxy getContainer(ContainerId cId) {
-      return containers.get(cId);
-    }
-
-    public Map<ContainerId, ContainerProxy> getContainers() {
-      return containers;
-    }
-
-    public int getNumClusterNode() {
-      return numCluster;
-    }
-
-    public void setNumClusterNodes(int num) {
-      numCluster = num;
-    }
-
-//    public CatalogService getCatalog() {
-//      return catalog;
-//    }
-
-    public Map<String, TableDesc> getTableDescMap() {
-      return tableDescMap;
-    }
-
-    public Path getOutputPath() {
-      return outputPath;
-    }
-
-    public void setMaxContainerCapability(int capability) {
-      this.maxCapability = capability;
-    }
-
-    public int getMaxContainerCapability() {
-      return this.maxCapability;
-    }
-
-    public void setMinContainerCapability(int capability) {
-      this.minCapability = capability;
-    }
-
-    public int getMinContainerCapability() {
-      return this.minCapability;
-    }
-
-    public boolean isCreateTableQuery() {
-      return isCreateTableStmt;
-    }
-
-    public float getProgress() {
-      if(query != null) {
-        return query.getProgress();
-      } else {
-        return 0;
+    public void stopQuery(QueryId queryId) {
+      QueryMasterTask queryMasterTask;
+      synchronized(queryMasterTasks) {
+        queryMasterTask = queryMasterTasks.remove(queryId);
       }
-    }
-
-    public long getStartTime() {
-      if(query != null) {
-        return query.getStartTime();
+      if(queryMasterTask != null) {
+        try {
+          queryMasterTask.stop();
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
       } else {
-        return -1;
+        LOG.warn("No query info:" + queryId);
       }
-    }
-
-    public long getFinishTime() {
-      if(query != null) {
-        return query.getFinishTime();
-      } else {
-        return -1;
+      if(!workerContext.isStandbyMode()) {
+        stop();
       }
     }
-
-    public StorageManager getStorageManager() {
-      return storageManager;
-    }
-
-    public QueryMaster getQueryMaster() {
-      return QueryMaster.this;
-    }
-
-    public YarnRPC getYarnRPC() {
-      return yarnRPC;
-    }
-
-    public void setState(TajoProtos.QueryState state) {
-      QueryMaster.this.state = state;
-    }
-
-    public TajoProtos.QueryState getState() {
-      return state;
-    }
-
-    public void touchSessionTime() {
-      this.lastClientHeartbeat.set(System.currentTimeMillis());
-    }
-
-    public long getLastClientHeartbeat() {
-      return this.lastClientHeartbeat.get();
-    }
   }
 
-  private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+  private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
     @Override
-    public void handle(QueryFinishEvent event) {
-      LOG.info("Query end notification started for QueryId : " + query.getId() + "," + query.getState());
-
-      //QueryMaster must be lived until client fetching all query result data.
-      try {
-        // Stop all services
-        // This will also send the final report to the ResourceManager
-        //LOG.info("Calling stop for all the services");
-//        stop();
-      } catch (Throwable t) {
-        LOG.warn("Graceful stop failed ", t);
+    public void handle(QueryStartEvent event) {
+      LOG.info("====>Start QueryStartEventHandler:" + event.getQueryId());
+      //To change body of implemented methods use File | Settings | File Templates.
+      QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
+          event.getQueryId(), event.getLogicalPlanJson());
+
+      queryMasterTask.init(queryConf);
+      queryMasterTask.start();
+      synchronized(queryMasterTasks) {
+        queryMasterTasks.put(event.getQueryId(), queryMasterTask);
       }
-
-      //Bring the process down by force.
-      //Not needed after HADOOP-7140
-      //LOG.info("Exiting QueryMaster..GoodBye!");
     }
   }
 
-  // query submission directory is private!
-  final public static FsPermission USER_DIR_PERMISSION =
-      FsPermission.createImmutable((short) 0700); // rwx--------
-
-  /**
-   * It initializes the final output and staging directory and sets
-   * them to variables.
-   */
-  private void initStagingDir() throws IOException {
-    QueryConf conf = getContext().getConf();
-
-    String realUser;
-    String currentUser;
-    UserGroupInformation ugi;
-    ugi = UserGroupInformation.getLoginUser();
-    realUser = ugi.getShortUserName();
-    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    String givenOutputTableName = conf.getOutputTable();
-    Path stagingDir;
-
-    // If final output directory is not given by an user,
-    // we use the query id as a output directory.
-    if (givenOutputTableName.equals("")) {
-      this.isCreateTableStmt = false;
-      FileSystem defaultFS = FileSystem.get(conf);
-
-      Path homeDirectory = defaultFS.getHomeDirectory();
-      if (!defaultFS.exists(homeDirectory)) {
-        defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
-      }
-
-      Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
-
-      if (defaultFS.exists(userQueryDir)) {
-        FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
-        String owner = fsStatus.getOwner();
+  class QueryHeartbeatThread extends Thread {
+    public QueryHeartbeatThread() {
+      super("QueryHeartbeatThread");
+    }
 
-        if (!(owner.equals(currentUser) || owner.equals(realUser))) {
-          throw new IOException("The ownership on the user's query " +
-              "directory " + userQueryDir + " is not as expected. " +
-              "It is owned by " + owner + ". The directory must " +
-              "be owned by the submitter " + currentUser + " or " +
-              "by " + realUser);
+    @Override
+    public void run() {
+      LOG.info("Start QueryMaster heartbeat thread");
+      while(!queryMasterStop.get()) {
+        //TODO report all query status
+        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+        synchronized(queryMasterTasks) {
+          tempTasks.addAll(queryMasterTasks.values());
         }
+        synchronized(queryMasterTasks) {
+          for(QueryMasterTask eachTask: tempTasks) {
+            TajoMasterProtocol.TajoHeartbeat queryHeartbeat = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+                .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+                .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+                .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+                .setState(eachTask.getState())
+                .setQueryId(eachTask.getQueryId().getProto())
+                .build();
 
-        if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
-          LOG.info("Permissions on staging directory " + userQueryDir + " are " +
-              "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
-              "to correct value " + USER_DIR_PERMISSION);
-          defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
+            workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
+          }
+        }
+        synchronized(queryMasterStop) {
+          try {
+            queryMasterStop.wait(2000);
+          } catch (InterruptedException e) {
+            break;
+          }
         }
-      } else {
-        defaultFS.mkdirs(userQueryDir,
-            new FsPermission(USER_DIR_PERMISSION));
-      }
-
-      stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
-
-      if (defaultFS.exists(stagingDir)) {
-        throw new IOException("The staging directory " + stagingDir
-            + "already exists. The directory must be unique to each query");
-      } else {
-        defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
       }
+      LOG.info("QueryMaster heartbeat thread stopped");
+    }
+  }
 
-      // Set the query id to the output table name
-      conf.setOutputTable(queryId.toString());
 
-    } else {
-      this.isCreateTableStmt = true;
-      Path warehouseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR),
-          TajoConstants.WAREHOUSE_DIR);
-      stagingDir = new Path(warehouseDir, conf.getOutputTable());
+  class ClientSessionTimeoutCheckThread extends Thread {
+    public void run() {
+      LOG.info("ClientSessionTimeoutCheckThread started");
+      while(true) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          break;
+        }
+        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+        synchronized(queryMasterTasks) {
+          tempTasks.addAll(queryMasterTasks.values());
+        }
 
-      FileSystem fs = warehouseDir.getFileSystem(conf);
-      if (fs.exists(stagingDir)) {
-        throw new IOException("The staging directory " + stagingDir
-            + " already exists. The directory must be unique to each query");
-      } else {
-        // TODO - should have appropriate permission
-        fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+        for(QueryMasterTask eachTask: tempTasks) {
+          try {
+            long lastHeartbeat = eachTask.getLastClientHeartbeat();
+            long time = System.currentTimeMillis() - lastHeartbeat;
+            if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+              LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
+              eachTask.expiredSessionTimeout();
+            }
+          } catch (Exception e) {
+            LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
+          }
+        }
       }
     }
-
-    conf.setOutputPath(stagingDir);
-    outputPath = stagingDir;
-    LOG.info("Initialized Query Staging Dir: " + outputPath);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
deleted file mode 100644
index 74298e5..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterClientService.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-public class QueryMasterClientService extends AbstractService {
-  private static final Log LOG = LogFactory.getLog(QueryMasterClientService.class);
-  private final PrimitiveProtos.BoolProto BOOL_TRUE =
-          PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
-
-  private ProtoBlockingRpcServer rpcServer;
-  private InetSocketAddress bindAddr;
-  private String addr;
-  private QueryMaster.QueryContext queryContext;
-  private QueryMasterClientProtocolServiceHandler serviceHandler;
-
-  public QueryMasterClientService(QueryMaster.QueryContext queryContext) {
-    super(QueryMasterClientService.class.getName());
-
-    this.queryContext = queryContext;
-    this.serviceHandler = new QueryMasterClientProtocolServiceHandler();
-
-    // init RPC Server in constructor cause Heartbeat Thread use bindAddr
-    // Setup RPC server
-    try {
-      // TODO initial port num is value of config and find unused port with sequence
-      InetSocketAddress initIsa = new InetSocketAddress(InetAddress.getLocalHost(), 0);
-      if (initIsa.getAddress() == null) {
-        throw new IllegalArgumentException("Failed resolve of " + initIsa);
-      }
-
-      // TODO blocking/non-blocking??
-      this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
-      this.rpcServer.start();
-
-      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
-      this.addr = NetUtils.normalizeInetSocketAddress(bindAddr);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-    // Get the master address
-    LOG.info(QueryMasterClientService.class.getSimpleName() + " (" + queryContext.getQueryId() + ") listens on "
-        + addr);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(rpcServer != null) {
-      rpcServer.shutdown();
-    }
-    LOG.info("QueryMasterClientService stopped");
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddr() {
-    return bindAddr;
-  }
-
-
-  public class QueryMasterClientProtocolServiceHandler
-          implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
-    @Override
-    public PrimitiveProtos.BoolProto updateSessionVariables(
-            RpcController controller,
-            ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
-      return null;
-    }
-
-    @Override
-    public ClientProtos.GetQueryResultResponse getQueryResult(
-            RpcController controller,
-            ClientProtos.GetQueryResultRequest request) throws ServiceException {
-      QueryId queryId = new QueryId(request.getQueryId());
-      Query query = queryContext.getQuery();
-
-      ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
-
-      if(query == null) {
-        builder.setErrorMessage("No Query for " + queryId);
-      } else {
-        switch (query.getState()) {
-          case QUERY_SUCCEEDED:
-            builder.setTableDesc((CatalogProtos.TableDescProto)query.getResultDesc().getProto());
-            break;
-          case QUERY_FAILED:
-          case QUERY_ERROR:
-            builder.setErrorMessage("Query " + queryId + " is failed");
-          default:
-            builder.setErrorMessage("Query " + queryId + " is still running");
-        }
-      }
-      return builder.build();
-    }
-
-    @Override
-    public ClientProtos.GetQueryStatusResponse getQueryStatus(
-            RpcController controller,
-            ClientProtos.GetQueryStatusRequest request) throws ServiceException {
-      ClientProtos.GetQueryStatusResponse.Builder builder
-              = ClientProtos.GetQueryStatusResponse.newBuilder();
-      QueryId queryId = new QueryId(request.getQueryId());
-      builder.setQueryId(request.getQueryId());
-
-      if (queryId.equals(TajoIdUtils.NullQueryId)) {
-        builder.setResultCode(ClientProtos.ResultCode.OK);
-        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
-      } else {
-        Query query = queryContext.getQuery();
-        builder.setResultCode(ClientProtos.ResultCode.OK);
-        builder.setQueryMasterHost(queryContext.getQueryMasterClientService().getBindAddr().getHostName());
-        builder.setQueryMasterPort(queryContext.getQueryMasterClientService().getBindAddr().getPort());
-
-
-        queryContext.touchSessionTime();
-        if (query != null) {
-          builder.setState(query.getState());
-          builder.setProgress(query.getProgress());
-          builder.setSubmitTime(query.getAppSubmitTime());
-          builder.setInitTime(query.getInitializationTime());
-          builder.setHasResult(!query.isCreateTableStmt());
-          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(query.getFinishTime());
-          } else {
-            builder.setFinishTime(System.currentTimeMillis());
-          }
-        } else {
-          builder.setState(queryContext.getState());
-        }
-      }
-
-      return builder.build();
-    }
-
-    @Override
-    public PrimitiveProtos.BoolProto killQuery(
-            RpcController controller,
-            YarnProtos.ApplicationAttemptIdProto request) throws ServiceException {
-      LOG.info("Stop QueryMaster:" + queryContext.getQueryId());
-      Thread t = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(1000);   //wait tile return to rpc response
-          } catch (InterruptedException e) {
-          }
-          queryContext.getQueryMaster().stop();
-        }
-      };
-      t.start();
-      return BOOL_TRUE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
deleted file mode 100644
index 35d7201..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManager.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
-import org.apache.tajo.master.ContainerProxy;
-import org.apache.tajo.master.TajoMaster;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-// TODO - check QueryMaster status and if QueryMaster failed, release resource
-public class QueryMasterManager extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(QueryMasterManager.class.getName());
-
-  // Master Context
-  private final TajoMaster.MasterContext masterContext;
-
-  // AppMaster Common
-  private final Clock clock;
-  private final long appSubmitTime;
-  private final ApplicationId appId;
-  private ApplicationAttemptId appAttemptId;
-
-  protected YarnClient yarnClient;
-
-  // For Query
-  private final QueryId queryId;
-
-  private AsyncDispatcher dispatcher;
-  private YarnRPC rpc;
-
-  private TajoProtos.QueryState state;
-  private float progress;
-  private long finishTime;
-  private TableDesc resultDesc;
-  private String queryMasterHost;
-  private int queryMasterPort;
-  private int queryMasterClientPort;
-
-  private LogicalRootNode plan;
-
-  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
-  private AtomicBoolean queryMasterStopped = new AtomicBoolean(true);
-
-  private boolean stopCheckThreadStarted = false;
-
-  private String query;
-
-  public QueryMasterManager(final TajoMaster.MasterContext masterContext,
-                     final YarnClient yarnClient,
-                     final QueryId queryId,
-                     final String query,
-                     final LogicalRootNode plan,
-                     final ApplicationId appId,
-                     final Clock clock, long appSubmitTime) {
-    super(QueryMasterManager.class.getName());
-    this.masterContext = masterContext;
-    this.yarnClient = yarnClient;
-
-    this.appId = appId;
-    this.clock = clock;
-    this.appSubmitTime = appSubmitTime;
-    this.queryId = queryId;
-    this.plan = plan;
-    this.query = query;
-    LOG.info("Created Query Master Manager for AppId=" + appId + ", QueryID=" + queryId);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    super.init(conf);
-
-    state = TajoProtos.QueryState.QUERY_MASTER_INIT;
-  }
-
-  public TajoProtos.QueryState getState() {
-    return state;
-  }
-
-  @Override
-  public void start() {
-    try {
-      appAttemptId = allocateAndLaunchQueryMaster();
-    } catch (YarnRemoteException e) {
-      LOG.error(e.getMessage(), e);
-    }
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    while(true) {
-      if(queryMasterStopped.get()) {
-        break;
-      }
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-      }
-    }
-    LOG.info("QueryMasterManager for " + queryId + " stopped");
-    super.stop();
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  public long getAppSubmitTime() {
-    return appSubmitTime;
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public TableDesc getResultDesc() {
-    return resultDesc;
-  }
-
-  public String getQueryMasterHost() {
-    return queryMasterHost;
-  }
-
-  public int getQueryMasterPort() {
-    return queryMasterPort;
-  }
-
-  public int getQueryMasterClientPort() {
-    return queryMasterClientPort;
-  }
-
-  public synchronized QueryHeartbeatResponse.ResponseCommand queryHeartbeat(QueryMasterManagerProtocol.QueryHeartbeat queryHeartbeat) {
-    this.queryMasterHost = queryHeartbeat.getQueryMasterHost();
-    this.queryMasterPort = queryHeartbeat.getQueryMasterPort();
-    this.queryMasterClientPort = queryHeartbeat.getQueryMasterClientPort();
-    this.state = queryHeartbeat.getState();
-    if(state == TajoProtos.QueryState.QUERY_FAILED) {
-      //TODO needed QueryMaster's detail status(failed before or after launching worker)
-      queryMasterStopped.set(true);
-      if(queryHeartbeat.getStatusMessage() != null) {
-        LOG.warn(queryId + " failed, " + queryHeartbeat.getStatusMessage());
-      }
-    }
-
-    if(!stopCheckThreadStarted && !queryMasterStopped.get() && isFinishState(this.state)) {
-      stopCheckThreadStarted = true;
-      startCheckingQueryMasterStop();
-    }
-    if(appAttemptId != null && !querySubmitted.get()) {
-      LOG.info("submitQuery to QueryMaster(" + queryMasterHost + ":" + queryMasterPort + ")");
-      queryMasterStopped.set(false);
-      querySubmitted.set(true);
-      List<String> params = new ArrayList<String>(3);
-      params.add(appAttemptId.toString());
-      params.add(query);
-      params.add(plan.toJson());
-      return QueryHeartbeatResponse.ResponseCommand.newBuilder()
-          .setCommand("executeQuery")
-          .addAllParams(params)
-          .build();
-    } else {
-      return null;
-    }
-  }
-
-  private boolean isFinishState(TajoProtos.QueryState state) {
-    return state == TajoProtos.QueryState.QUERY_FAILED ||
-        state == TajoProtos.QueryState.QUERY_KILLED ||
-        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
-  }
-
-  private void startCheckingQueryMasterStop() {
-    Thread t = new Thread() {
-      public void run() {
-        try {
-          ApplicationReport report = monitorApplication(appId,
-              EnumSet.of(
-                  YarnApplicationState.FINISHED,
-                  YarnApplicationState.KILLED,
-                  YarnApplicationState.FAILED));
-          queryMasterStopped.set(true);
-          LOG.info("QueryMaster (" + queryId + ") stopped");
-        } catch (YarnRemoteException e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    };
-
-    t.start();
-  }
-
-  private ApplicationAttemptId allocateAndLaunchQueryMaster() throws YarnRemoteException {
-    LOG.info("Allocate and launch QueryMaster:" + yarnClient);
-    ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
-
-    // set the application id
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName("Tajo");
-
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(5);
-    appContext.setPriority(pri);
-
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue("default");
-
-    ContainerLaunchContext commonContainerLaunchContext =
-            ContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true);
-
-    // Setup environment by cloning from common env.
-    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
-    Map<String, String> myEnv = new HashMap<String, String>(env.size());
-    myEnv.putAll(env);
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the necessary command to execute the application master
-    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
-    // Set java executable command
-    //LOG.info("Setting up app master command");
-    vargs.add("${JAVA_HOME}" + "/bin/java");
-    // Set Xmx based on am memory size
-    vargs.add("-Xmx2000m");
-    // Set Remote Debugging
-    //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
-    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
-    //}
-    // Set class name
-    vargs.add(QueryMasterRunner.class.getCanonicalName());
-    vargs.add(queryId.toString()); // queryId
-    vargs.add(String.valueOf(appSubmitTime));
-    vargs.add(masterContext.getQueryMasterManagerService().getBindAddress().getHostName() + ":" +
-            masterContext.getQueryMasterManagerService().getBindAddress().getPort());
-
-    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-    // Get final commmand
-    StringBuilder command = new StringBuilder();
-    for (CharSequence str : vargs) {
-      command.append(str).append(" ");
-    }
-
-    LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
-    List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());
-
-    final Resource resource = Records.newRecord(Resource.class);
-    // TODO - get default value from conf
-    resource.setMemory(2000);
-    resource.setVirtualCores(1);
-
-    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-
-    ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
-            null, commonContainerLaunchContext.getUser(),
-            resource, commonContainerLaunchContext.getLocalResources(), myEnv, commands,
-            myServiceData, null, new HashMap<ApplicationAccessType, String>(2));
-
-    appContext.setAMContainerSpec(masterContainerContext);
-
-    LOG.info("Submitting QueryMaster to ResourceManager");
-    yarnClient.submitApplication(appContext);
-
-    ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
-    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-
-    LOG.info("Launching QueryMaster with id: " + attemptId);
-
-    state = TajoProtos.QueryState.QUERY_MASTER_LAUNCHED;
-
-    return attemptId;
-  }
-
-  private ApplicationReport monitorApplication(ApplicationId appId,
-                                               Set<YarnApplicationState> finalState) throws YarnRemoteException {
-
-    long sleepTime = 100;
-    int count = 1;
-    while (true) {
-      // Get application report for the appId we are interested in
-      ApplicationReport report = yarnClient.getApplicationReport(appId);
-
-      LOG.info("Got application report from ASM for" + ", appId="
-              + appId.getId() + ", appAttemptId="
-              + report.getCurrentApplicationAttemptId() + ", clientToken="
-              + report.getClientToken() + ", appDiagnostics="
-              + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
-              + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
-              + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
-              + ", yarnAppState=" + report.getYarnApplicationState().toString()
-              + ", distributedFinalState="
-              + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
-              + report.getTrackingUrl() + ", appUser=" + report.getUser());
-
-      YarnApplicationState state = report.getYarnApplicationState();
-      if (finalState.contains(state)) {
-        return report;
-      }
-      try {
-        Thread.sleep(sleepTime);
-        sleepTime = count * 100;
-        if(count < 10) {
-          count++;
-        }
-      } catch (InterruptedException e) {
-        //LOG.debug("Thread sleep in monitoring loop interrupted");
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
deleted file mode 100644
index 65f237c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeat;
-import org.apache.tajo.ipc.QueryMasterManagerProtocol.QueryHeartbeatResponse;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.rpc.ProtoBlockingRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-
-public class QueryMasterManagerService extends AbstractService {
-  private final static Log LOG = LogFactory.getLog(QueryMasterManagerService.class);
-
-  private final TajoMaster.MasterContext context;
-  private final TajoConf conf;
-  private final QueryMasterManagerProtocolServiceHandler masterHandler;
-  private ProtoBlockingRpcServer server;
-  private InetSocketAddress bindAddress;
-
-  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
-  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
-
-  public QueryMasterManagerService(TajoMaster.MasterContext context) {
-    super(QueryMasterManagerService.class.getName());
-    this.context = context;
-    this.conf = context.getConf();
-    this.masterHandler = new QueryMasterManagerProtocolServiceHandler();
-  }
-
-  @Override
-  public void start() {
-    // TODO resolve hostname
-    String confMasterServiceAddr = conf.getVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS);
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
-    try {
-      server = new ProtoBlockingRpcServer(QueryMasterManagerProtocol.class, masterHandler, initIsa);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    server.start();
-    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
-    this.conf.setVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
-    LOG.info("QueryMasterManagerService startup");
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(server != null) {
-      server.shutdown();
-      server = null;
-    }
-    LOG.info("QueryMasterManagerService shutdown");
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return bindAddress;
-  }
-
-  public class QueryMasterManagerProtocolServiceHandler implements QueryMasterManagerProtocol.QueryMasterManagerProtocolService.BlockingInterface {
-    @Override
-    public QueryHeartbeatResponse queryHeartbeat(RpcController controller, QueryHeartbeat request) throws ServiceException {
-      // TODO - separate QueryMasterManagerProtocol, ClientServiceProtocol
-      QueryId queryId = new QueryId(request.getQueryId());
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Received QueryHeartbeat:" + queryId + "," + request);
-      }
-      QueryMasterManager queryMasterManager = context.getQuery(queryId);
-      if (queryMasterManager == null) {
-        LOG.warn("No query:" + queryId);
-        return QueryHeartbeatResponse.newBuilder().setHeartbeatResult(BOOL_FALSE).build();
-      }
-
-      QueryHeartbeatResponse.ResponseCommand command = queryMasterManager.queryHeartbeat(request);
-
-      //ApplicationAttemptId attemptId = queryMasterManager.getAppAttemptId();
-      //String attemptIdStr = attemptId == null ? null : attemptId.toString();
-      QueryHeartbeatResponse.Builder builder = QueryHeartbeatResponse.newBuilder();
-      builder.setHeartbeatResult(BOOL_TRUE);
-      if(command != null) {
-        builder.setResponseCommand(command);
-      }
-      return builder.build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
index daab9fd..5a79464 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -39,13 +39,11 @@ public class QueryMasterRunner extends AbstractService {
   private QueryConf queryConf;
   private QueryMaster queryMaster;
   private QueryId queryId;
-  private long appSubmitTime;
   private String queryMasterManagerAddress;
 
-  public QueryMasterRunner(QueryId queryId, long appSubmitTime, String queryMasterManagerAddress) {
+  public QueryMasterRunner(QueryId queryId, String queryMasterManagerAddress) {
     super(QueryMasterRunner.class.getName());
     this.queryId = queryId;
-    this.appSubmitTime = appSubmitTime;
     this.queryMasterManagerAddress = queryMasterManagerAddress;
   }
 
@@ -72,7 +70,7 @@ public class QueryMasterRunner extends AbstractService {
   @Override
   public void start() {
     //create QueryMaster
-    QueryMaster query = new QueryMaster(queryId, appSubmitTime, queryMasterManagerAddress);
+    QueryMaster query = new QueryMaster(null);
 
     query.init(queryConf);
     query.start();
@@ -90,13 +88,12 @@ public class QueryMasterRunner extends AbstractService {
 
     UserGroupInformation.setConfiguration(conf);
 
-    final QueryId queryId = TajoIdUtils.createQueryId(args[0]);
-    final long appSubmitTime = Long.parseLong(args[1]);
-    final String queryMasterManagerAddr = args[2];
+    final QueryId queryId = TajoIdUtils.parseQueryId(args[0]);
+    final String queryMasterManagerAddr = args[1];
 
     LOG.info("Received QueryId:" + queryId);
 
-    QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, appSubmitTime, queryMasterManagerAddr);
+    QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, queryMasterManagerAddr);
     queryMasterRunner.init(conf);
     queryMasterRunner.start();