You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:18 UTC

[10/13] tajo git commit: TAJO-1288: Refactoring org.apache.tajo.master package.

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
deleted file mode 100644
index 559fc14..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
-import org.apache.tajo.json.GsonObject;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.util.history.History;
-
-public class QueryInfo implements GsonObject, History {
-  private QueryId queryId;
-  @Expose
-  private QueryContext context;
-  @Expose
-  private String sql;
-  @Expose
-  private volatile TajoProtos.QueryState queryState;
-  @Expose
-  private volatile float progress;
-  @Expose
-  private volatile long startTime;
-  @Expose
-  private volatile  long finishTime;
-  @Expose
-  private String lastMessage;
-  @Expose
-  private String hostNameOfQM;
-  @Expose
-  private int queryMasterPort;
-  @Expose
-  private int queryMasterClientPort;
-  @Expose
-  private int queryMasterInfoPort;
-  @Expose
-  private String queryIdStr;
-  @Expose
-  private volatile TableDesc resultDesc;
-
-  private String jsonExpr;
-
-  public QueryInfo(QueryId queryId) {
-    this(queryId, null, null, null);
-  }
-
-  public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) {
-    this.queryId = queryId;
-    this.queryIdStr = queryId.toString();
-    this.context = queryContext;
-    this.sql = sql;
-    this.jsonExpr = jsonExpr;
-
-    this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public QueryContext getQueryContext() {
-    return context;
-  }
-
-  public String getSql() {
-    return sql;
-  }
-
-  public String getQueryMasterHost() {
-    return hostNameOfQM;
-  }
-
-  public void setQueryMaster(String hostName) {
-    this.hostNameOfQM = hostName;
-  }
-
-  public int getQueryMasterInfoPort() {
-    return queryMasterInfoPort;
-  }
-
-  public void setQueryMasterInfoPort(int queryMasterInfoPort) {
-    this.queryMasterInfoPort = queryMasterInfoPort;
-  }
-
-  public void setQueryMasterPort(int port) {
-    this.queryMasterPort = port;
-  }
-
-  public int getQueryMasterPort() {
-    return queryMasterPort;
-  }
-
-  public void setQueryMasterclientPort(int port) {
-    queryMasterClientPort = port;
-  }
-
-  public int getQueryMasterClientPort() {
-    return queryMasterClientPort;
-  }
-
-  public TajoProtos.QueryState getQueryState() {
-    return queryState;
-  }
-
-  public void setQueryState(TajoProtos.QueryState queryState) {
-    this.queryState = queryState;
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
-  }
-
-  public String getLastMessage() {
-    return lastMessage;
-  }
-
-  public void setLastMessage(String lastMessage) {
-    this.lastMessage = lastMessage;
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  public void setProgress(float progress) {
-    this.progress = progress;
-  }
-
-  public void setResultDesc(TableDesc result) {
-    this.resultDesc = result;
-  }
-
-  public boolean hasResultdesc() {
-    return resultDesc != null;
-  }
-
-  public TableDesc getResultDesc() {
-    return resultDesc;
-  }
-
-  @Override
-  public String toString() {
-    return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster="
-        + getQueryMasterHost();
-  }
-
-  public String getJsonExpr() {
-    return jsonExpr;
-  }
-
-  @Override
-  public String toJson() {
-    return CoreGsonHelper.toJson(this, QueryInfo.class);
-  }
-
-  @Override
-  public HistoryType getHistoryType() {
-    return HistoryType.QUERY_SUMMARY;
-  }
-
-  public static QueryInfo fromJson(String json) {
-    QueryInfo queryInfo = CoreGsonHelper.fromJson(json, QueryInfo.class);
-    queryInfo.queryId = TajoIdUtils.parseQueryId(queryInfo.queryIdStr);
-    return queryInfo;
-  }
-
-  public String getQueryIdStr() {
-    return queryIdStr;
-  }
-
-  public QueryInfoProto getProto() {
-    QueryInfoProto.Builder builder = QueryInfoProto.newBuilder();
-
-    builder.setQueryId(queryId.toString())
-        .setQueryState(queryState)
-        .setContextVars(context.getProto())
-        .setProgress(progress)
-        .setStartTime(startTime)
-        .setFinishTime(finishTime)
-        .setQueryMasterPort(queryMasterPort)
-        .setQueryMasterClientPort(queryMasterClientPort)
-        .setQueryMasterInfoPort(queryMasterInfoPort);
-
-    if (resultDesc != null) {
-      builder.setResultDesc(resultDesc.getProto());
-    }
-
-    if (sql != null) {
-      builder.setSql(sql);
-    }
-
-    if (lastMessage != null) {
-      builder.setLastMessage(lastMessage);
-    }
-
-    if (hostNameOfQM != null) {
-      builder.setHostNameOfQM(hostNameOfQM);
-    }
-
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
deleted file mode 100644
index ce30ec7..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
-  private QueryInfo queryInfo;
-
-  public QueryJobEvent(Type type, QueryInfo queryInfo) {
-    super(type);
-
-    this.queryInfo = queryInfo;
-  }
-
-  public QueryInfo getQueryInfo() {
-    return this.queryInfo;
-  }
-
-  public enum Type {
-    QUERY_JOB_START,
-    QUERY_JOB_HEARTBEAT,
-    QUERY_JOB_FINISH,
-    QUERY_JOB_STOP,
-    QUERY_MASTER_START,
-    QUERY_MASTER_STOP,
-    QUERY_JOB_KILL
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
deleted file mode 100644
index 13f6456..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.scheduler.SimpleFifoScheduler;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class QueryJobManager extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
-
-  // TajoMaster Context
-  private final TajoMaster.MasterContext masterContext;
-
-  private AsyncDispatcher dispatcher;
-
-  private SimpleFifoScheduler scheduler;
-
-  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
-  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-
-  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
-  private AtomicLong maxExecutionTime = new AtomicLong();
-  private AtomicLong avgExecutionTime = new AtomicLong();
-  private AtomicLong executedQuerySize = new AtomicLong();
-
-  public QueryJobManager(final TajoMaster.MasterContext masterContext) {
-    super(QueryJobManager.class.getName());
-    this.masterContext = masterContext;
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    try {
-      this.dispatcher = new AsyncDispatcher();
-      addService(this.dispatcher);
-
-      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
-
-      this.scheduler = new SimpleFifoScheduler(this);
-    } catch (Exception e) {
-      catchException(null, e);
-    }
-
-    super.init(conf);
-  }
-
-  @Override
-  public void stop() {
-    synchronized(runningQueries) {
-      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
-        eachQueryInProgress.stop();
-      }
-    }
-    this.scheduler.stop();
-    super.stop();
-  }
-
-  @Override
-  public void start() {
-    this.scheduler.start();
-    super.start();
-  }
-
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-  public Collection<QueryInProgress> getSubmittedQueries() {
-    synchronized (submittedQueries){
-      return Collections.unmodifiableCollection(submittedQueries.values());
-    }
-  }
-
-  public Collection<QueryInProgress> getRunningQueries() {
-    synchronized (runningQueries){
-      return Collections.unmodifiableCollection(runningQueries.values());
-    }
-  }
-
-  public synchronized Collection<QueryInfo> getFinishedQueries() {
-    try {
-      return this.masterContext.getHistoryReader().getQueries(null);
-    } catch (Throwable e) {
-      LOG.error(e);
-      return Lists.newArrayList();
-    }
-  }
-
-
-  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
-    try {
-      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
-    } catch (Throwable e) {
-      LOG.error(e);
-      return null;
-    }
-  }
-
-  public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
-                                     String jsonExpr, LogicalRootNode plan)
-      throws Exception {
-    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
-        jsonExpr, plan);
-
-    synchronized (submittedQueries) {
-      queryInProgress.getQueryInfo().setQueryMaster("");
-      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    scheduler.addQuery(queryInProgress);
-    return queryInProgress.getQueryInfo();
-  }
-
-  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
-    QueryInProgress queryInProgress;
-
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.remove(queryId);
-    }
-
-    synchronized (runningQueries) {
-      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    addService(queryInProgress);
-    queryInProgress.init(getConfig());
-    queryInProgress.start();
-
-    if (!queryInProgress.startQueryMaster()) {
-      stopQuery(queryId);
-    }
-
-    return queryInProgress.getQueryInfo();
-  }
-
-  public TajoMaster.MasterContext getMasterContext() {
-    return masterContext;
-  }
-
-  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
-    @Override
-    public void handle(QueryJobEvent event) {
-      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-      if(queryInProgress == null) {
-        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
-        return;
-      }
-
-      if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
-        stopQuery(event.getQueryInfo().getQueryId());
-      } else if (queryInProgress.isStarted()) {
-        queryInProgress.getEventHandler().handle(event);
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        scheduler.removeQuery(queryInProgress.getQueryId());
-        queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-
-        stopQuery(queryInProgress.getQueryId());
-      }
-    }
-  }
-
-  public QueryInProgress getQueryInProgress(QueryId queryId) {
-    QueryInProgress queryInProgress;
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.get(queryId);
-    }
-
-    if (queryInProgress == null) {
-      synchronized (runningQueries) {
-        queryInProgress = runningQueries.get(queryId);
-      }
-    }
-    return queryInProgress;
-  }
-
-  public void stopQuery(QueryId queryId) {
-    LOG.info("Stop QueryInProgress:" + queryId);
-    QueryInProgress queryInProgress = getQueryInProgress(queryId);
-    if(queryInProgress != null) {
-      queryInProgress.stop();
-      synchronized(submittedQueries) {
-        submittedQueries.remove(queryId);
-      }
-
-      synchronized(runningQueries) {
-        runningQueries.remove(queryId);
-      }
-
-      QueryInfo queryInfo = queryInProgress.getQueryInfo();
-      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
-      if (executionTime < minExecutionTime.get()) {
-        minExecutionTime.set(executionTime);
-      }
-
-      if (executionTime > maxExecutionTime.get()) {
-        maxExecutionTime.set(executionTime);
-      }
-
-      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
-      if (totalExecutionTime > 0) {
-        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
-      } else {
-        avgExecutionTime.set(executionTime);
-      }
-      executedQuerySize.incrementAndGet();
-      removeService(queryInProgress);
-    } else {
-      LOG.warn("No QueryInProgress while query stopping: " + queryId);
-    }
-  }
-
-  public long getMinExecutionTime() {
-    if (getExecutedQuerySize() == 0) return 0;
-    return minExecutionTime.get();
-  }
-
-  public long getMaxExecutionTime() {
-    return maxExecutionTime.get();
-  }
-
-  public long getAvgExecutionTime() {
-    return avgExecutionTime.get();
-  }
-
-  public long getExecutedQuerySize() {
-    return executedQuerySize.get();
-  }
-
-  private void catchException(QueryId queryId, Exception e) {
-    LOG.error(e.getMessage(), e);
-    QueryInProgress queryInProgress = runningQueries.get(queryId);
-    queryInProgress.catchException(e);
-  }
-
-  public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
-      TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
-    if(queryInProgress == null) {
-      return null;
-    }
-
-    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
-    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
-
-    return null;
-  }
-
-  private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
-    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
-
-    queryInfo.setQueryMaster(connectionInfo.getHost());
-    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
-    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
-    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
-    queryInfo.setQueryState(queryHeartbeat.getState());
-    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
-
-    if (queryHeartbeat.hasQueryFinishTime()) {
-      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
-    }
-
-    if (queryHeartbeat.hasResultDesc()) {
-      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
-    }
-
-    return queryInfo;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
deleted file mode 100644
index 641de78..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ /dev/null
@@ -1,631 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tajo.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.QueryStartEvent;
-import org.apache.tajo.master.event.QueryStopEvent;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.history.QueryHistory;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-
-// TODO - when exception, send error status to QueryJobManager
-public class QueryMaster extends CompositeService implements EventHandler {
-  private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
-
-  private int querySessionTimeout;
-
-  private Clock clock;
-
-  private AsyncDispatcher dispatcher;
-
-  private GlobalPlanner globalPlanner;
-
-  private TajoConf systemConf;
-
-  private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
-
-  private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap();
-
-  private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
-
-  private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
-
-  private QueryMasterContext queryMasterContext;
-
-  private QueryContext queryContext;
-
-  private QueryHeartbeatThread queryHeartbeatThread;
-
-  private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread;
-
-  private TajoWorker.WorkerContext workerContext;
-
-  private RpcConnectionPool connPool;
-
-  private ExecutorService eventExecutor;
-
-  public QueryMaster(TajoWorker.WorkerContext workerContext) {
-    super(QueryMaster.class.getName());
-    this.workerContext = workerContext;
-  }
-
-  public void init(Configuration conf) {
-    LOG.info("QueryMaster init");
-    try {
-      this.systemConf = (TajoConf)conf;
-      this.connPool = RpcConnectionPool.getPool(systemConf);
-
-      querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
-      queryMasterContext = new QueryMasterContext(systemConf);
-
-      clock = new SystemClock();
-
-      this.dispatcher = new AsyncDispatcher();
-      addIfService(dispatcher);
-
-      globalPlanner = new GlobalPlanner(systemConf, workerContext);
-
-      dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
-      dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler());
-
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-      throw new RuntimeException(t);
-    }
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    LOG.info("QueryMaster start");
-
-    queryHeartbeatThread = new QueryHeartbeatThread();
-    queryHeartbeatThread.start();
-
-    clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
-    clientSessionTimeoutCheckThread.start();
-
-    finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread();
-    finishedQueryMasterTaskCleanThread.start();
-
-    eventExecutor = Executors.newSingleThreadExecutor();
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(queryMasterStop.getAndSet(true)){
-      return;
-    }
-
-    if(queryHeartbeatThread != null) {
-      queryHeartbeatThread.interrupt();
-    }
-
-    if(clientSessionTimeoutCheckThread != null) {
-      clientSessionTimeoutCheckThread.interrupt();
-    }
-
-    if(finishedQueryMasterTaskCleanThread != null) {
-      finishedQueryMasterTaskCleanThread.interrupt();
-    }
-
-    if(eventExecutor != null){
-      eventExecutor.shutdown();
-    }
-
-    super.stop();
-
-    LOG.info("QueryMaster stop");
-    if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
-      queryMasterContext.getWorkerContext().stopWorker(true);
-    }
-  }
-
-  protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) {
-    StringBuilder cleanupMessage = new StringBuilder();
-    String prefix = "";
-    for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) {
-      cleanupMessage.append(prefix).append(new ExecutionBlockId(eachEbId).toString());
-      prefix = ",";
-    }
-    LOG.info("cleanup executionBlocks: " + cleanupMessage);
-    NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
-    TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
-    builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
-    TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
-
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
-      try {
-        TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
-        rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
-            TajoWorkerProtocol.class, true);
-        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
-
-        tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get());
-      } catch (Exception e) {
-        continue;
-      } finally {
-        connPool.releaseConnection(rpc);
-      }
-    }
-  }
-
-  private void cleanup(QueryId queryId) {
-    LOG.info("cleanup query resources : " + queryId);
-    NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
-
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
-      try {
-        TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
-        rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
-            TajoWorkerProtocol.class, true);
-        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
-
-        tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
-      } catch (Exception e) {
-        LOG.error(e.getMessage());
-      } finally {
-        connPool.releaseConnection(rpc);
-      }
-    }
-  }
-
-  public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
-
-    NettyClientBase rpc = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-
-      TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
-
-      CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
-          new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
-      masterService.getAllWorkerResource(callBack.getController(),
-          PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
-
-      TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
-      return workerResourcesRequest.getWorkerResourcesList();
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(rpc);
-    }
-    return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
-  }
-
-  public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
-    LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
-    NettyClientBase tmClient = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
-      TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
-          .setConnectionInfo(workerContext.getConnectionInfo().getProto())
-          .setState(state)
-          .setQueryId(queryId.getProto());
-
-      CallFuture<TajoHeartbeatResponse> callBack =
-          new CallFuture<TajoHeartbeatResponse>();
-
-      masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
-  }
-
-  @Override
-  public void handle(Event event) {
-    dispatcher.getEventHandler().handle(event);
-  }
-
-  public Query getQuery(QueryId queryId) {
-    return queryMasterTasks.get(queryId).getQuery();
-  }
-
-  public QueryMasterTask getQueryMasterTask(QueryId queryId) {
-    return queryMasterTasks.get(queryId);
-  }
-
-  public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) {
-    QueryMasterTask queryMasterTask =  queryMasterTasks.get(queryId);
-    if(queryMasterTask != null) {
-      return queryMasterTask;
-    } else {
-      if(includeFinished) {
-        return finishedQueryMasterTasks.get(queryId);
-      } else {
-        return null;
-      }
-    }
-  }
-
-  public QueryMasterContext getContext() {
-    return this.queryMasterContext;
-  }
-
-  public Collection<QueryMasterTask> getQueryMasterTasks() {
-    return queryMasterTasks.values();
-  }
-
-  public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
-    return finishedQueryMasterTasks.values();
-  }
-
-  public class QueryMasterContext {
-    private TajoConf conf;
-
-    public QueryMasterContext(TajoConf conf) {
-      this.conf = conf;
-    }
-
-    public TajoConf getConf() {
-      return conf;
-    }
-
-    public ExecutorService getEventExecutor(){
-      return eventExecutor;
-    }
-
-    public AsyncDispatcher getDispatcher() {
-      return dispatcher;
-    }
-
-    public Clock getClock() {
-      return clock;
-    }
-
-    public QueryMaster getQueryMaster() {
-      return QueryMaster.this;
-    }
-
-    public GlobalPlanner getGlobalPlanner() {
-      return globalPlanner;
-    }
-
-    public TajoWorker.WorkerContext getWorkerContext() {
-      return workerContext;
-    }
-
-    public EventHandler getEventHandler() {
-      return dispatcher.getEventHandler();
-    }
-
-    public void stopQuery(QueryId queryId) {
-      QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId);
-      if(queryMasterTask == null) {
-        LOG.warn("No query info:" + queryId);
-        return;
-      }
-
-      finishedQueryMasterTasks.put(queryId, queryMasterTask);
-
-      TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
-      CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
-
-      NettyClientBase tmClient = null;
-      try {
-        // In TajoMaster HA mode, if backup master be active status,
-        // worker may fail to connect existing active master. Thus,
-        // if worker can't connect the master, worker should try to connect another master and
-        // update master address in worker context.
-        if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-          try {
-            tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
-          } catch (Exception e) {
-            queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
-            queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-            tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
-          }
-        } else {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-        masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
-      }  catch (Exception e) {
-        //this function will be closed in new thread.
-        //When tajo do stop cluster, tajo master maybe throw closed connection exception
-
-        LOG.error(e.getMessage(), e);
-      } finally {
-        connPool.releaseConnection(tmClient);
-      }
-
-      try {
-        queryMasterTask.stop();
-        if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
-          cleanup(queryId);
-        }
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-      }
-      Query query = queryMasterTask.getQuery();
-      if (query != null) {
-        QueryHistory queryHisory = query.getQueryHistory();
-        if (queryHisory != null) {
-          query.context.getQueryMasterContext().getWorkerContext().
-              getTaskHistoryWriter().appendHistory(queryHisory);
-        }
-      }
-      if(workerContext.isYarnContainerMode()) {
-        stop();
-      }
-    }
-  }
-
-  private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
-    TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
-
-    builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
-    builder.setQueryId(queryMasterTask.getQueryId().getProto());
-    builder.setState(queryMasterTask.getState());
-    if (queryMasterTask.getQuery() != null) {
-      if (queryMasterTask.getQuery().getResultDesc() != null) {
-        builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
-      }
-      builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
-      builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
-    }
-    return builder.build();
-  }
-
-  private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
-    @Override
-    public void handle(QueryStartEvent event) {
-      LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
-      QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson());
-
-      synchronized(queryMasterTasks) {
-        queryMasterTasks.put(event.getQueryId(), queryMasterTask);
-      }
-
-      queryMasterTask.init(systemConf);
-      if (!queryMasterTask.isInitError()) {
-        queryMasterTask.start();
-      }
-
-      queryContext = event.getQueryContext();
-
-      if (queryMasterTask.isInitError()) {
-        queryMasterContext.stopQuery(queryMasterTask.getQueryId());
-        return;
-      }
-    }
-  }
-
-  private class QueryStopEventHandler implements EventHandler<QueryStopEvent> {
-    @Override
-    public void handle(QueryStopEvent event) {
-      queryMasterContext.stopQuery(event.getQueryId());
-    }
-  }
-
-  class QueryHeartbeatThread extends Thread {
-    public QueryHeartbeatThread() {
-      super("QueryHeartbeatThread");
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Start QueryMaster heartbeat thread");
-      while(!queryMasterStop.get()) {
-        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
-        synchronized(queryMasterTasks) {
-          tempTasks.addAll(queryMasterTasks.values());
-        }
-        synchronized(queryMasterTasks) {
-          for(QueryMasterTask eachTask: tempTasks) {
-            NettyClientBase tmClient;
-            try {
-              // In TajoMaster HA mode, if backup master be active status,
-              // worker may fail to connect existing active master. Thus,
-              // if worker can't connect the master, worker should try to connect another master and
-              // update master address in worker context.
-              if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-                try {
-                  tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
-                } catch (Exception e) {
-                  queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-                      HAServiceUtil.getResourceTrackerAddress(systemConf));
-                  queryMasterContext.getWorkerContext().setTajoMasterAddress(
-                      HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-                  tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
-                }
-              } else {
-                tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                    TajoMasterProtocol.class, true);
-              }
-
-              TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
-              CallFuture<TajoHeartbeatResponse> callBack =
-                  new CallFuture<TajoHeartbeatResponse>();
-
-              TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
-              masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
-            } catch (Throwable t) {
-              t.printStackTrace();
-            }
-          }
-        }
-        synchronized(queryMasterStop) {
-          try {
-            queryMasterStop.wait(2000);
-          } catch (InterruptedException e) {
-            break;
-          }
-        }
-      }
-      LOG.info("QueryMaster heartbeat thread stopped");
-    }
-  }
-
-  class ClientSessionTimeoutCheckThread extends Thread {
-    public void run() {
-      LOG.info("ClientSessionTimeoutCheckThread started");
-      while(!queryMasterStop.get()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          break;
-        }
-        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
-        synchronized(queryMasterTasks) {
-          tempTasks.addAll(queryMasterTasks.values());
-        }
-
-        for(QueryMasterTask eachTask: tempTasks) {
-          if(!eachTask.isStopped()) {
-            try {
-              long lastHeartbeat = eachTask.getLastClientHeartbeat();
-              long time = System.currentTimeMillis() - lastHeartbeat;
-              if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
-                LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms");
-                eachTask.expireQuerySession();
-              }
-            } catch (Exception e) {
-              LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  class FinishedQueryMasterTaskCleanThread extends Thread {
-    public void run() {
-      int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
-      LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
-      while(!queryMasterStop.get()) {
-        try {
-          Thread.sleep(60 * 1000 * 60);   // hourly
-        } catch (InterruptedException e) {
-          break;
-        }
-        try {
-          long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000;
-          cleanExpiredFinishedQueryMasterTask(expireTime);
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }
-
-    private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
-      synchronized(finishedQueryMasterTasks) {
-        List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
-        for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) {
-          if(entry.getValue().getStartTime() < expireTime) {
-            expiredQueryIds.add(entry.getKey());
-          }
-        }
-
-        for(QueryId eachId: expiredQueryIds) {
-          finishedQueryMasterTasks.remove(eachId);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
deleted file mode 100644
index 9f7d3f8..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.DefaultTaskScheduler;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.rpc.AsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.net.InetSocketAddress;
-
-public class QueryMasterManagerService extends CompositeService
-    implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
-  private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
-
-  private AsyncRpcServer rpcServer;
-  private InetSocketAddress bindAddr;
-  private String addr;
-  private int port;
-
-  private QueryMaster queryMaster;
-
-  private TajoWorker.WorkerContext workerContext;
-
-  public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
-    super(QueryMasterManagerService.class.getName());
-    this.workerContext = workerContext;
-    this.port = port;
-  }
-
-  public QueryMaster getQueryMaster() {
-    return queryMaster;
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    Preconditions.checkArgument(conf instanceof TajoConf);
-    TajoConf tajoConf = (TajoConf) conf;
-    try {
-      // Setup RPC server
-      InetSocketAddress initIsa =
-          new InetSocketAddress("0.0.0.0", port);
-      if (initIsa.getAddress() == null) {
-        throw new IllegalArgumentException("Failed resolve of " + initIsa);
-      }
-
-      int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
-      this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
-      this.rpcServer.start();
-
-      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
-      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
-      this.port = bindAddr.getPort();
-
-      queryMaster = new QueryMaster(workerContext);
-      addService(queryMaster);
-
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-    // Get the master address
-    LOG.info("QueryMasterManagerService is bind to " + addr);
-    ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(rpcServer != null) {
-      rpcServer.shutdown();
-    }
-    LOG.info("QueryMasterManagerService stopped");
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddr() {
-    return bindAddr;
-  }
-
-  public String getHostAndPort() {
-    return bindAddr.getHostName() + ":" + bindAddr.getPort();
-  }
-
-  @Override
-  public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
-                      RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
-    try {
-      ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
-      QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
-
-      if(queryMasterTask == null || queryMasterTask.isStopped()) {
-        done.run(DefaultTaskScheduler.stopTaskRunnerReq);
-      } else {
-        TajoContainerId cid =
-            queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
-        LOG.debug("getTask:" + cid + ", ebId:" + ebId);
-        queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
-      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
-      TaskAttemptId attemptId = new TaskAttemptId(request.getId());
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
-      if (queryMasterTask == null) {
-        queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
-      }
-      Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
-      Task task = sq.getTask(attemptId.getTaskId());
-      TaskAttempt attempt = task.getAttempt(attemptId.getId());
-
-      if(LOG.isDebugEnabled()){
-        LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
-      }
-
-      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
-        LOG.warn(attemptId + " Killed");
-        attempt.handle(
-            new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
-      } else {
-        queryMasterTask.getEventHandler().handle(
-            new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-
-  @Override
-  public void ping(RpcController controller,
-                   TajoIdProtos.ExecutionBlockIdProto requestProto,
-                   RpcCallback<PrimitiveProtos.BoolProto> done) {
-    done.run(TajoWorker.TRUE_PROTO);
-  }
-
-  @Override
-  public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
-                         RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
-      if (queryMasterTask != null) {
-        queryMasterTask.handleTaskFailed(report);
-      } else {
-        LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-
-  @Override
-  public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
-                   RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
-      if (queryMasterTask != null) {
-        queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-
-  @Override
-  public void doneExecutionBlock(
-      RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request,
-      RpcCallback<PrimitiveProtos.BoolProto> done) {
-    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
-    if (queryMasterTask != null) {
-      ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
-      queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
-    }
-    done.run(TajoWorker.TRUE_PROTO);
-  }
-
-  @Override
-  public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
-                        RpcCallback<PrimitiveProtos.BoolProto> done) {
-    QueryId queryId = new QueryId(request);
-    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
-    if (queryMasterTask != null) {
-      Query query = queryMasterTask.getQuery();
-      if (query != null) {
-        query.handle(new QueryEvent(queryId, QueryEventType.KILL));
-      }
-    }
-  }
-
-  @Override
-  public void executeQuery(RpcController controller,
-                           TajoWorkerProtocol.QueryExecutionRequestProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
-
-      QueryId queryId = new QueryId(request.getQueryId());
-      LOG.info("Receive executeQuery request:" + queryId);
-      queryMaster.handle(new QueryStartEvent(queryId,
-          new Session(request.getSession()),
-          new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
-              request.getQueryContext()), request.getExprInJson().getValue(),
-          request.getLogicalPlanJson().getValue()));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
deleted file mode 100644
index 56dd789..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.io.PrintWriter;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-
-@Deprecated
-public class QueryMasterRunner extends AbstractService {
-  private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
-  private TajoConf systemConf;
-  private QueryMaster queryMaster;
-  private QueryId queryId;
-  private String queryMasterManagerAddress;
-
-  public QueryMasterRunner(QueryId queryId, String queryMasterManagerAddress) {
-    super(QueryMasterRunner.class.getName());
-    this.queryId = queryId;
-    this.queryMasterManagerAddress = queryMasterManagerAddress;
-  }
-
-  private class ShutdownHook implements Runnable {
-    @Override
-    public void run() {
-      LOG.info("============================================");
-      LOG.info("QueryMaster received SIGINT Signal");
-      LOG.info("============================================");
-      stop();
-    }
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    this.systemConf = (TajoConf)conf;
-    RackResolver.init(systemConf);
-    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    //create QueryMaster
-    QueryMaster query = new QueryMaster(null);
-
-    query.init(systemConf);
-    query.start();
-  }
-
-  @Override
-  public void stop() {
-  }
-
-  public static void main(String[] args) throws Exception {
-    LOG.info("QueryMasterRunner started");
-
-    final TajoConf conf = new TajoConf();
-    conf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
-
-    UserGroupInformation.setConfiguration(conf);
-
-    final QueryId queryId = TajoIdUtils.parseQueryId(args[0]);
-    final String queryMasterManagerAddr = args[1];
-
-    LOG.info("Received QueryId:" + queryId);
-
-    QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, queryMasterManagerAddr);
-    queryMasterRunner.init(conf);
-    queryMasterRunner.start();
-
-    synchronized(queryId) {
-      queryId.wait();
-    }
-
-    System.exit(0);
-  }
-
-  public static void printThreadInfo(PrintWriter stream, String title) {
-    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-    final int STACK_DEPTH = 60;
-    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
-    long[] threadIds = threadBean.getAllThreadIds();
-    stream.println("Process Thread Dump: " + title);
-    stream.println(threadIds.length + " active threads");
-    for (long tid : threadIds) {
-      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
-      if (info == null) {
-        stream.println("  Inactive");
-        continue;
-      }
-      stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
-      Thread.State state = info.getThreadState();
-      stream.println("  State: " + state);
-      stream.println("  Blocked count: " + info.getBlockedCount());
-      stream.println("  Waited count: " + info.getWaitedCount());
-      if (contention) {
-        stream.println("  Blocked time: " + info.getBlockedTime());
-        stream.println("  Waited time: " + info.getWaitedTime());
-      }
-      if (state == Thread.State.WAITING) {
-        stream.println("  Waiting on " + info.getLockName());
-      } else if (state == Thread.State.BLOCKED) {
-        stream.println("  Blocked on " + info.getLockName());
-        stream.println("  Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
-      }
-      stream.println("  Stack:");
-      for (StackTraceElement frame : info.getStackTrace()) {
-        stream.println("    " + frame.toString());
-      }
-    }
-    stream.flush();
-  }
-
-  private static String getTaskName(long id, String name) {
-    if (name == null) {
-      return Long.toString(id);
-    }
-    return id + " (" + name + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
deleted file mode 100644
index 9c789a5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ /dev/null
@@ -1,638 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tajo.*;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.algebra.JsonHelper;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TajoContainerProxy;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageProperty;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.metrics.TajoMetrics;
-import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
-import org.apache.tajo.worker.AbstractResourceAllocator;
-import org.apache.tajo.worker.TajoResourceAllocator;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.tajo.TajoProtos.QueryState;
-
-public class QueryMasterTask extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
-
-  // query submission directory is private!
-  final public static FsPermission STAGING_DIR_PERMISSION =
-      FsPermission.createImmutable((short) 0700); // rwx--------
-
-  public static final String TMP_STAGING_DIR_PREFIX = ".staging";
-
-  private QueryId queryId;
-
-  private Session session;
-
-  private QueryContext queryContext;
-
-  private QueryMasterTaskContext queryTaskContext;
-
-  private QueryMaster.QueryMasterContext queryMasterContext;
-
-  private Query query;
-
-  private MasterPlan masterPlan;
-
-  private String jsonExpr;
-
-  private String logicalPlanJson;
-
-  private AsyncDispatcher dispatcher;
-
-  private final long querySubmitTime;
-
-  private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
-
-  private TajoConf systemConf;
-
-  private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
-
-  private AbstractResourceAllocator resourceAllocator;
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private TajoMetrics queryMetrics;
-
-  private Throwable initError;
-
-  private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
-      new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
-
-  public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
-                         String logicalPlanJson) {
-
-    super(QueryMasterTask.class.getName());
-    this.queryMasterContext = queryMasterContext;
-    this.queryId = queryId;
-    this.session = session;
-    this.queryContext = queryContext;
-    this.jsonExpr = jsonExpr;
-    this.logicalPlanJson = logicalPlanJson;
-    this.querySubmitTime = System.currentTimeMillis();
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    systemConf = (TajoConf)conf;
-
-    try {
-      queryTaskContext = new QueryMasterTaskContext();
-      String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
-
-      if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
-        resourceAllocator = new TajoResourceAllocator(queryTaskContext);
-      } else {
-        throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
-      }
-      addService(resourceAllocator);
-
-      dispatcher = new AsyncDispatcher();
-      addService(dispatcher);
-
-      dispatcher.register(StageEventType.class, new StageEventDispatcher());
-      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
-      dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
-      dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
-      dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
-
-      initStagingDir();
-
-      queryMetrics = new TajoMetrics(queryId.toString());
-
-      super.init(systemConf);
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-      initError = t;
-    }
-  }
-
-  public boolean isStopped() {
-    return stopped.get();
-  }
-
-  @Override
-  public void start() {
-    startQuery();
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-
-    if(stopped.getAndSet(true)) {
-      return;
-    }
-
-    LOG.info("Stopping QueryMasterTask:" + queryId);
-
-    try {
-      resourceAllocator.stop();
-    } catch (Throwable t) {
-      LOG.fatal(t.getMessage(), t);
-    }
-
-    RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
-    NettyClientBase tmClient = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
-
-    super.stop();
-
-    //TODO change report to tajo master
-    if (queryMetrics != null) {
-      queryMetrics.report(new MetricsConsoleReporter());
-    }
-
-    LOG.info("Stopped QueryMasterTask:" + queryId);
-  }
-
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-    ExecutionBlockId id = event.getExecutionBlockId();
-    query.getStage(id).handleTaskRequestEvent(event);
-  }
-
-  public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
-    synchronized(diagnostics) {
-      if (diagnostics.size() < 10) {
-        diagnostics.add(report);
-      }
-    }
-
-    getEventHandler().handle(new TaskFatalErrorEvent(report));
-  }
-
-  public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
-    synchronized(diagnostics) {
-      return Collections.unmodifiableCollection(diagnostics);
-    }
-  }
-
-  private class StageEventDispatcher implements EventHandler<StageEvent> {
-    public void handle(StageEvent event) {
-      ExecutionBlockId id = event.getStageId();
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("StageEventDispatcher:" + id + "," + event.getType());
-      }
-      query.getStage(id).handle(event);
-    }
-  }
-
-  private class TaskEventDispatcher
-      implements EventHandler<TaskEvent> {
-    public void handle(TaskEvent event) {
-      TaskId taskId = event.getTaskId();
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
-      }
-      Task task = query.getStage(taskId.getExecutionBlockId()).
-          getTask(taskId);
-      task.handle(event);
-    }
-  }
-
-  private class TaskAttemptEventDispatcher
-      implements EventHandler<TaskAttemptEvent> {
-    public void handle(TaskAttemptEvent event) {
-      TaskAttemptId attemptId = event.getTaskAttemptId();
-      Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId());
-      Task task = stage.getTask(attemptId.getTaskId());
-      TaskAttempt attempt = task.getAttempt(attemptId);
-      attempt.handle(event);
-    }
-  }
-
-  private class TaskSchedulerDispatcher
-      implements EventHandler<TaskSchedulerEvent> {
-    public void handle(TaskSchedulerEvent event) {
-      Stage stage = query.getStage(event.getExecutionBlockId());
-      stage.getTaskScheduler().handle(event);
-    }
-  }
-
-  private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
-    @Override
-    public void handle(LocalTaskEvent event) {
-      TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
-      if (proxy != null) {
-        proxy.killTaskAttempt(event.getTaskAttemptId());
-      }
-    }
-  }
-
-  private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
-    @Override
-    public void handle(QueryMasterQueryCompletedEvent event) {
-      QueryId queryId = event.getQueryId();
-      LOG.info("Query completion notified from " + queryId);
-
-      while (!isTerminatedState(query.getSynchronizedState())) {
-        try {
-          synchronized (this) {
-            wait(10);
-          }
-        } catch (InterruptedException e) {
-          LOG.error(e);
-        }
-      }
-      LOG.info("Query final state: " + query.getSynchronizedState());
-
-      queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
-    }
-  }
-
-  private static boolean isTerminatedState(QueryState state) {
-    return
-        state == QueryState.QUERY_SUCCEEDED ||
-        state == QueryState.QUERY_FAILED ||
-        state == QueryState.QUERY_KILLED ||
-        state == QueryState.QUERY_ERROR;
-  }
-
-  public synchronized void startQuery() {
-    StorageManager sm = null;
-    LogicalPlan plan = null;
-    try {
-      if (query != null) {
-        LOG.warn("Query already started");
-        return;
-      }
-      CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
-      LogicalPlanner planner = new LogicalPlanner(catalog);
-      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
-      Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
-      jsonExpr = null; // remove the possible OOM
-      plan = planner.createPlan(queryContext, expr);
-
-      StoreType storeType = PlannerUtil.getStoreType(plan);
-      if (storeType != null) {
-        sm = StorageManager.getStorageManager(systemConf, storeType);
-        StorageProperty storageProperty = sm.getStorageProperty();
-        if (storageProperty.isSortedInsert()) {
-          String tableName = PlannerUtil.getStoreTableName(plan);
-          LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-          TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
-          if (tableDesc == null) {
-            throw new VerifyException("Can't get table meta data from catalog: " + tableName);
-          }
-          List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
-              getQueryTaskContext().getQueryContext(), tableDesc);
-          if (storageSpecifiedRewriteRules != null) {
-            for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
-              optimizer.addRuleAfterToJoinOpt(eachRule);
-            }
-          }
-        }
-      }
-
-      optimizer.optimize(queryContext, plan);
-
-      for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
-        LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
-        if (scanNodes != null) {
-          for (LogicalNode eachScanNode : scanNodes) {
-            ScanNode scanNode = (ScanNode) eachScanNode;
-            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
-          }
-        }
-
-        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
-        if (scanNodes != null) {
-          for (LogicalNode eachScanNode : scanNodes) {
-            ScanNode scanNode = (ScanNode) eachScanNode;
-            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
-          }
-        }
-      }
-      MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
-      queryMasterContext.getGlobalPlanner().build(masterPlan);
-
-      query = new Query(queryTaskContext, queryId, querySubmitTime,
-          "", queryTaskContext.getEventHandler(), masterPlan);
-
-      dispatcher.register(QueryEventType.class, query);
-      queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-      initError = t;
-
-      if (plan != null && sm != null) {
-        LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-        try {
-          sm.rollbackOutputCommit(rootNode.getChild());
-        } catch (IOException e) {
-          LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
-        }
-      }
-    }
-  }
-
-  private void initStagingDir() throws IOException {
-    Path stagingDir = null;
-    FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
-
-    try {
-
-      stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext);
-
-      // Create a subdirectories
-      LOG.info("The staging dir '" + stagingDir + "' is created.");
-      queryContext.setStagingDir(stagingDir);
-    } catch (IOException ioe) {
-      if (stagingDir != null && defaultFS.exists(stagingDir)) {
-        try {
-          defaultFS.delete(stagingDir, true);
-          LOG.info("The staging directory '" + stagingDir + "' is deleted");
-        } catch (Exception e) {
-          LOG.warn(e.getMessage());
-        }
-      }
-
-      throw ioe;
-    }
-  }
-
-  /**
-   * It initializes the final output and staging directory and sets
-   * them to variables.
-   */
-  public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException {
-
-    String realUser;
-    String currentUser;
-    UserGroupInformation ugi;
-    ugi = UserGroupInformation.getLoginUser();
-    realUser = ugi.getShortUserName();
-    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    FileSystem fs;
-    Path stagingDir;
-
-    ////////////////////////////////////////////
-    // Create Output Directory
-    ////////////////////////////////////////////
-
-    String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
-    if (context.isCreateTable() || context.isInsert()) {
-      if (outputPath == null || outputPath.isEmpty()) {
-        // hbase
-        stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
-      } else {
-        stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
-      }
-    } else {
-      stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
-    }
-
-    // initializ
-    fs = stagingDir.getFileSystem(conf);
-
-    if (fs.exists(stagingDir)) {
-      throw new IOException("The staging directory '" + stagingDir + "' already exists");
-    }
-    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
-    FileStatus fsStatus = fs.getFileStatus(stagingDir);
-    String owner = fsStatus.getOwner();
-
-    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
-      throw new IOException("The ownership on the user's query " +
-          "directory " + stagingDir + " is not as expected. " +
-          "It is owned by " + owner + ". The directory must " +
-          "be owned by the submitter " + currentUser + " or " +
-          "by " + realUser);
-    }
-
-    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
-      LOG.info("Permissions on staging directory " + stagingDir + " are " +
-          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
-          "to correct value " + STAGING_DIR_PERMISSION);
-      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
-    }
-
-    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    fs.mkdirs(stagingResultDir);
-
-    return stagingDir;
-  }
-
-  public Query getQuery() {
-    return query;
-  }
-
-  protected void expireQuerySession() {
-    if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){
-      query.handle(new QueryEvent(queryId, QueryEventType.KILL));
-    }
-  }
-
-  public QueryMasterTaskContext getQueryTaskContext() {
-    return queryTaskContext;
-  }
-
-  public EventHandler getEventHandler() {
-    return queryTaskContext.getEventHandler();
-  }
-
-  public void touchSessionTime() {
-    this.lastClientHeartbeat.set(System.currentTimeMillis());
-  }
-
-  public long getLastClientHeartbeat() {
-    return this.lastClientHeartbeat.get();
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public boolean isInitError() {
-    return initError != null;
-  }
-
-  public QueryState getState() {
-    if(query == null) {
-      if (isInitError()) {
-        return QueryState.QUERY_ERROR;
-      } else {
-        return QueryState.QUERY_NOT_ASSIGNED;
-      }
-    } else {
-      return query.getState();
-    }
-  }
-
-  public Throwable getInitError() {
-    return initError;
-  }
-
-  public String getErrorMessage() {
-    if (isInitError()) {
-      return StringUtils.stringifyException(initError);
-    } else {
-      return null;
-    }
-  }
-
-  public long getQuerySubmitTime() {
-    return this.querySubmitTime;
-  }
-
-  public class QueryMasterTaskContext {
-    EventHandler eventHandler;
-    public QueryMaster.QueryMasterContext getQueryMasterContext() {
-      return queryMasterContext;
-    }
-
-    public Session getSession() {
-      return session;
-    }
-
-    public QueryContext getQueryContext() {
-      return queryContext;
-    }
-
-    public TajoConf getConf() {
-      return systemConf;
-    }
-
-    public Clock getClock() {
-      return queryMasterContext.getClock();
-    }
-
-    public Query getQuery() {
-      return query;
-    }
-
-    public QueryId getQueryId() {
-      return queryId;
-    }
-
-    public Path getStagingDir() {
-      return queryContext.getStagingDir();
-    }
-
-    public synchronized EventHandler getEventHandler() {
-      if(eventHandler == null) {
-        eventHandler = dispatcher.getEventHandler();
-      }
-      return eventHandler;
-    }
-
-    public AsyncDispatcher getDispatcher() {
-      return dispatcher;
-    }
-
-    public Stage getStage(ExecutionBlockId id) {
-      return query.getStage(id);
-    }
-
-    public Map<String, TableDesc> getTableDescMap() {
-      return tableDescMap;
-    }
-
-    public float getProgress() {
-      if(query == null) {
-        return 0.0f;
-      }
-      return query.getProgress();
-    }
-
-    public AbstractResourceAllocator getResourceAllocator() {
-      return resourceAllocator;
-    }
-
-    public TajoMetrics getQueryMetrics() {
-      return queryMetrics;
-    }
-  }
-}
\ No newline at end of file