You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:27 UTC

[24/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
new file mode 100644
index 0000000..a8f5b31
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -0,0 +1,702 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.InsertNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class Query implements EventHandler<QueryEvent> {
+  private static final Log LOG = LogFactory.getLog(Query.class);
+
+  // Facilities for Query
+  private final TajoConf systemConf;
+  private final Clock clock;
+  private String queryStr;
+  private Map<ExecutionBlockId, SubQuery> subqueries;
+  private final EventHandler eventHandler;
+  private final MasterPlan plan;
+  private final AbstractStorageManager sm;
+  QueryMasterTask.QueryMasterTaskContext context;
+  private ExecutionBlockCursor cursor;
+
+  // Query Status
+  private final QueryId id;
+  private long appSubmitTime;
+  private long startTime;
+  private long finishTime;
+  private TableDesc resultDesc;
+  private int completedSubQueryCount = 0;
+  private int successedSubQueryCount = 0;
+  private int killedSubQueryCount = 0;
+  private int failedSubQueryCount = 0;
+  private int erroredSubQueryCount = 0;
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  // Internal Variables
+  private final Lock readLock;
+  private final Lock writeLock;
+  private int priority = 100;
+
+  // State Machine
+  private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+
+  // Transition Handler
+  private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+  private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition();
+  private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
+
+  protected static final StateMachineFactory
+      <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+      new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+          (QueryState.QUERY_NEW)
+
+          // Transitions from NEW state
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
+              QueryEventType.START,
+              new StartTransition())
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+              QueryEventType.KILL,
+              new KillNewQueryTransition())
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from RUNNING state
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                  QueryState.QUERY_ERROR),
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.KILL,
+              new KillSubQueriesTransition())
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from QUERY_SUCCEEDED state
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          // ignore-able transitions
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.KILL)
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from KILL_WAIT state
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                  QueryState.QUERY_ERROR),
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              EnumSet.of(QueryEventType.KILL))
+
+          // Transitions from FAILED state
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+              QueryEventType.KILL)
+
+          // Transitions from ERROR state
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              QueryEventType.KILL)
+
+          .installTopology();
+
+  public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
+               final long appSubmitTime,
+               final String queryStr,
+               final EventHandler eventHandler,
+               final MasterPlan plan) {
+    this.context = context;
+    this.systemConf = context.getConf();
+    this.id = id;
+    this.clock = context.getClock();
+    this.appSubmitTime = appSubmitTime;
+    this.queryStr = queryStr;
+    subqueries = Maps.newHashMap();
+    this.eventHandler = eventHandler;
+    this.plan = plan;
+    this.sm = context.getStorageManager();
+    cursor = new ExecutionBlockCursor(plan);
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public float getProgress() {
+    QueryState state = getStateMachine().getCurrentState();
+    if (state == QueryState.QUERY_SUCCEEDED) {
+      return 1.0f;
+    } else {
+      int idx = 0;
+      List<SubQuery> tempSubQueries = new ArrayList<SubQuery>();
+      synchronized(subqueries) {
+        tempSubQueries.addAll(subqueries.values());
+      }
+      float [] subProgresses = new float[tempSubQueries.size()];
+      boolean finished = true;
+      for (SubQuery subquery: tempSubQueries) {
+        if (subquery.getState() != SubQueryState.NEW) {
+          subProgresses[idx] = subquery.getProgress();
+          if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
+            finished = false;
+          }
+        } else {
+          subProgresses[idx] = 0.0f;
+        }
+        idx++;
+      }
+
+      if (finished) {
+        return 1.0f;
+      }
+
+      float totalProgress = 0;
+      float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
+
+      for (int i = 0; i < subProgresses.length; i++) {
+        totalProgress += subProgresses[i] * proportion;
+      }
+
+      return totalProgress;
+    }
+  }
+
+  public long getAppSubmitTime() {
+    return this.appSubmitTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime() {
+    startTime = clock.getTime();
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime() {
+    finishTime = clock.getTime();
+  }
+
+  public List<String> getDiagnostics() {
+    readLock.lock();
+    try {
+      return diagnostics;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected void addDiagnostic(String diag) {
+    diagnostics.add(diag);
+  }
+
+  public TableDesc getResultDesc() {
+    return resultDesc;
+  }
+
+  public void setResultDesc(TableDesc desc) {
+    resultDesc = desc;
+  }
+
+  public MasterPlan getPlan() {
+    return plan;
+  }
+
+  public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+    return stateMachine;
+  }
+  
+  public void addSubQuery(SubQuery subquery) {
+    subqueries.put(subquery.getId(), subquery);
+  }
+  
+  public QueryId getId() {
+    return this.id;
+  }
+
+  public SubQuery getSubQuery(ExecutionBlockId id) {
+    return this.subqueries.get(id);
+  }
+
+  public Collection<SubQuery> getSubQueries() {
+    return this.subqueries.values();
+  }
+
+  public QueryState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public ExecutionBlockCursor getExecutionBlockCursor() {
+    return cursor;
+  }
+
+  public static class StartTransition
+      implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent queryEvent) {
+      query.setStartTime();
+      SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
+          query.getExecutionBlockCursor().nextBlock(), query.sm);
+      subQuery.setPriority(query.priority--);
+      query.addSubQuery(subQuery);
+
+      subQuery.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INIT));
+      LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+    }
+  }
+
+  public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+    @Override
+    public QueryState transition(Query query, QueryEvent queryEvent) {
+      QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
+      QueryState finalState;
+      if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
+        finalizeQuery(query, subQueryEvent);
+        finalState = QueryState.QUERY_SUCCEEDED;
+      } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
+        finalState = QueryState.QUERY_FAILED;
+      } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
+        finalState = QueryState.QUERY_KILLED;
+      } else {
+        finalState = QueryState.QUERY_ERROR;
+      }
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+      query.setFinishTime();
+      return finalState;
+    }
+
+    private void finalizeQuery(Query query, QueryCompletedEvent event) {
+      MasterPlan masterPlan = query.getPlan();
+
+      ExecutionBlock terminal = query.getPlan().getTerminalBlock();
+      DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+      Path finalOutputDir = commitOutputData(query);
+
+      QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+      try {
+        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
+            finalOutputDir);
+      } catch (Exception e) {
+        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+      }
+    }
+
+    /**
+     * It moves a result data stored in a staging output dir into a final output dir.
+     */
+    public Path commitOutputData(Query query) {
+      QueryContext queryContext = query.context.getQueryContext();
+      Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+      Path finalOutputDir;
+      if (queryContext.hasOutputPath()) {
+        finalOutputDir = queryContext.getOutputPath();
+        try {
+          FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
+
+          if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO
+
+            // it moves the original table into the temporary location.
+            // Then it moves the new result table into the original table location.
+            // Upon failed, it recovers the original table if possible.
+            boolean movedToOldTable = false;
+            boolean committed = false;
+            Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+            try {
+              if (fs.exists(finalOutputDir)) {
+                fs.rename(finalOutputDir, oldTableDir);
+                movedToOldTable = fs.exists(oldTableDir);
+              } else { // if the parent does not exist, make its parent directory.
+                fs.mkdirs(finalOutputDir.getParent());
+              }
+              fs.rename(stagingResultDir, finalOutputDir);
+              committed = fs.exists(finalOutputDir);
+            } catch (IOException ioe) {
+              // recover the old table
+              if (movedToOldTable && !committed) {
+                fs.rename(oldTableDir, finalOutputDir);
+              }
+            }
+          } else {
+            fs.rename(stagingResultDir, finalOutputDir);
+            LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      } else {
+        finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+      }
+
+      return finalOutputDir;
+    }
+
+    private static interface QueryHook {
+      boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
+      void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
+                   ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
+    }
+
+    private class QueryHookExecutor {
+      private List<QueryHook> hookList = TUtil.newList();
+      private QueryMaster.QueryMasterContext context;
+
+      public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
+        this.context = context;
+        hookList.add(new MaterializedResultHook());
+        hookList.add(new CreateTableHook());
+        hookList.add(new InsertTableHook());
+      }
+
+      public void execute(QueryContext queryContext, Query query,
+                          ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir) throws Exception {
+        for (QueryHook hook : hookList) {
+          if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
+            hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
+          }
+        }
+      }
+    }
+
+    private class MaterializedResultHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                Path finalOutputDir) {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        NodeType type = lastStage.getBlock().getPlan().getType();
+        return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir) throws Exception {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        TableMeta meta = lastStage.getTableMeta();
+        TableStats stats = lastStage.getResultStats();
+
+        TableDesc resultTableDesc =
+            new TableDesc(
+                query.getId().toString(),
+                lastStage.getSchema(),
+                meta,
+                finalOutputDir);
+        resultTableDesc.setExternal(true);
+
+        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+        resultTableDesc.setStats(stats);
+        query.setResultDesc(resultTableDesc);
+      }
+    }
+
+    private class CreateTableHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                Path finalOutputDir) {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        TableStats stats = lastStage.getResultStats();
+
+        CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
+        TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
+
+        TableDesc tableDescTobeCreated =
+            new TableDesc(
+                createTableNode.getTableName(),
+                createTableNode.getTableSchema(),
+                meta,
+                finalOutputDir);
+        tableDescTobeCreated.setExternal(createTableNode.isExternal());
+
+        if (createTableNode.hasPartition()) {
+          tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
+        }
+
+        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+        tableDescTobeCreated.setStats(stats);
+        query.setResultDesc(tableDescTobeCreated);
+
+        catalog.createTable(tableDescTobeCreated);
+      }
+    }
+
+    private class InsertTableHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                Path finalOutputDir) {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
+          throws Exception {
+
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        TableMeta meta = lastStage.getTableMeta();
+        TableStats stats = lastStage.getResultStats();
+
+        InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
+
+        TableDesc finalTable;
+        if (insertNode.hasTargetTable()) {
+          String tableName = insertNode.getTableName();
+          finalTable = catalog.getTableDesc(tableName);
+        } else {
+          String tableName = query.getId().toString();
+          finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir);
+        }
+
+        long volume = getTableVolume(query.systemConf, finalOutputDir);
+        stats.setNumBytes(volume);
+        finalTable.setStats(stats);
+
+        if (insertNode.hasTargetTable()) {
+          catalog.dropTable(insertNode.getTableName());
+          catalog.createTable(finalTable);
+        }
+
+        query.setResultDesc(finalTable);
+      }
+    }
+
+    private long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+      FileSystem fs = tablePath.getFileSystem(systemConf);
+      ContentSummary directorySummary = fs.getContentSummary(tablePath);
+      return directorySummary.getLength();
+    }
+  }
+
+  public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    private boolean hasNext(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.peek();
+      return !query.getPlan().isTerminal(nextBlock);
+    }
+
+    private void executeNextBlock(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.nextBlock();
+      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+      nextSubQuery.setPriority(query.priority--);
+      query.addSubQuery(nextSubQuery);
+      nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
+
+      LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+        LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+      }
+    }
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      try {
+        query.completedSubQueryCount++;
+        SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+
+        if (castEvent.getState() == SubQueryState.SUCCEEDED) {
+          query.successedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.KILLED) {
+          query.killedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.FAILED) {
+          query.failedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.ERROR) {
+          query.erroredSubQueryCount++;
+        } else {
+          LOG.error(String.format("Invalid SubQuery (%s) State %s at %s",
+              castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getState().name()));
+          query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+        }
+
+        // if a subquery is succeeded and a query is running
+        if (castEvent.getState() == SubQueryState.SUCCEEDED &&  // latest subquery succeeded
+            query.getState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
+            hasNext(query)) {                                   // there remains at least one subquery.
+          executeNextBlock(query);
+        } else { // if a query is completed due to finished, kill, failure, or error
+          query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+        }
+      } catch (Throwable t) {
+        LOG.error(t);
+        query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+      }
+    }
+  }
+
+  private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+    }
+  }
+
+  private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.setFinishTime();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+    }
+  }
+
+  private static class KillSubQueriesTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      synchronized (query.subqueries) {
+        for (SubQuery subquery : query.subqueries.values()) {
+          query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL));
+        }
+      }
+    }
+  }
+
+  private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.setFinishTime();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+    }
+  }
+
+  @Override
+  public void handle(QueryEvent event) {
+    LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+    try {
+      writeLock.lock();
+      QueryState oldState = getState();
+      try {
+        getStateMachine().doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (oldState != getState()) {
+        LOG.info(id + " Query Transitioned from " + oldState + " to " + getState());
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
new file mode 100644
index 0000000..de323cd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+
+public class QueryInProgress extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+  private QueryId queryId;
+
+  private Session session;
+
+  private QueryContext queryContext;
+
+  private TajoAsyncDispatcher dispatcher;
+
+  private LogicalRootNode plan;
+
+  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private QueryInfo queryInfo;
+
+  private final TajoMaster.MasterContext masterContext;
+
+  private NettyClientBase queryMasterRpc;
+
+  private QueryMasterProtocolService queryMasterRpcClient;
+
+  private YarnProtos.ContainerIdProto qmContainerId;
+
+  public QueryInProgress(
+      TajoMaster.MasterContext masterContext,
+      Session session,
+      QueryContext queryContext,
+      QueryId queryId, String sql, LogicalRootNode plan) {
+    super(QueryInProgress.class.getName());
+    this.masterContext = masterContext;
+    this.session = session;
+    this.queryContext = queryContext;
+    this.queryId = queryId;
+    this.plan = plan;
+
+    queryInfo = new QueryInfo(queryId, sql);
+    queryInfo.setStartTime(System.currentTimeMillis());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId);
+    this.addService(dispatcher);
+
+    dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
+    super.init(conf);
+  }
+
+  public void kill() {
+    queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+  }
+
+  @Override
+  public void stop() {
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    LOG.info("=========================================================");
+    LOG.info("Stop query:" + queryId);
+
+    masterContext.getResourceManager().stopQueryMaster(queryId);
+
+    long startTime = System.currentTimeMillis();
+    while(true) {
+      try {
+        if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+          LOG.info(queryId + " QueryMaster stopped");
+          break;
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        break;
+      }
+
+      try {
+        synchronized (this){
+          wait(100);
+        }
+      } catch (InterruptedException e) {
+        break;
+      }
+      if(System.currentTimeMillis() - startTime > 60 * 1000) {
+        LOG.warn("Failed to stop QueryMaster:" + queryId);
+        break;
+      }
+    }
+
+    if(queryMasterRpc != null) {
+      RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+
+
+  public boolean startQueryMaster() {
+    try {
+      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+      WorkerResourceManager resourceManager = masterContext.getResourceManager();
+      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+      // if no resource to allocate a query master
+      if(resource == null) {
+        LOG.info("No Available Resources for QueryMaster");
+        return false;
+      }
+
+      queryInfo.setQueryMaster(resource.getWorkerHost());
+      queryInfo.setQueryMasterPort(resource.getQueryMasterPort());
+      queryInfo.setQueryMasterclientPort(resource.getClientPort());
+
+      getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+
+      return true;
+    } catch (Exception e) {
+      catchException(e);
+      return false;
+    }
+  }
+
+  class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
+    @Override
+    public void handle(QueryJobEvent queryJobEvent) {
+      if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        heartbeat(queryJobEvent.getQueryInfo());
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+        queryInProgress.getEventHandler().handle(
+            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
+        submmitQueryToMaster();
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
+        stop();
+      } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        kill();
+      }
+    }
+  }
+
+  public QueryMasterProtocolService getQueryMasterRpcClient() {
+    return queryMasterRpcClient;
+  }
+
+  private void connectQueryMaster() throws Exception {
+    InetSocketAddress addr = NetUtils.createSocketAddrForHost(
+        queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+    LOG.info("Connect to QueryMaster:" + addr);
+    queryMasterRpc =
+        RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);
+    queryMasterRpcClient = queryMasterRpc.getStub();
+  }
+
+  private synchronized void submmitQueryToMaster() {
+    if(querySubmitted.get()) {
+      return;
+    }
+
+    try {
+      if(queryMasterRpcClient == null) {
+        connectQueryMaster();
+      }
+      if(queryMasterRpcClient == null) {
+        LOG.info("No QueryMaster conneciton info.");
+        //TODO wait
+        return;
+      }
+      LOG.info("Call executeQuery to :" +
+          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+      queryMasterRpcClient.executeQuery(
+          null,
+          TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
+              .setQueryId(queryId.getProto())
+              .setSession(session.getProto())
+              .setQueryContext(queryContext.getProto())
+              .setSql(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getSql()))
+              .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
+              .build(), NullCallback.get());
+      querySubmitted.set(true);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public void catchException(Exception e) {
+    LOG.error(e.getMessage(), e);
+    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+    queryInfo.setLastMessage(StringUtils.stringifyException(e));
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  private void heartbeat(QueryInfo queryInfo) {
+    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+    this.queryInfo.setQueryState(queryInfo.getQueryState());
+    this.queryInfo.setProgress(queryInfo.getProgress());
+    this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+    if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+      this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+      LOG.info(queryId + queryInfo.getLastMessage());
+    }
+    if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+      //TODO needed QueryMaster's detail status(failed before or after launching worker)
+      //queryMasterStopped.set(true);
+      LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+    }
+
+    if(!querySubmitted.get()) {
+      getEventHandler().handle(
+          new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo));
+    }
+
+    if(isFinishState(this.queryInfo.getQueryState())) {
+      getEventHandler().handle(
+          new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo));
+    }
+  }
+
+  private boolean isFinishState(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_FAILED ||
+        state == TajoProtos.QueryState.QUERY_KILLED ||
+        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
new file mode 100644
index 0000000..b077b36
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+
+public class QueryInfo {
+  private QueryId queryId;
+  private String sql;
+  private TajoProtos.QueryState queryState;
+  private float progress;
+  private long startTime;
+  private long finishTime;
+  private String lastMessage;
+  private String hostNameOfQM;
+  private int queryMasterPort;
+  private int queryMasterClientPort;
+
+  public QueryInfo(QueryId queryId) {
+    this(queryId, null);
+  }
+
+  public QueryInfo(QueryId queryId, String sql) {
+    this.queryId = queryId;
+    this.sql = sql;
+    this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  public String getQueryMasterHost() {
+    return hostNameOfQM;
+  }
+
+  public void setQueryMaster(String hostName) {
+    this.hostNameOfQM = hostName;
+
+  }
+
+  public void setQueryMasterPort(int port) {
+    this.queryMasterPort = port;
+  }
+
+  public int getQueryMasterPort() {
+    return queryMasterPort;
+  }
+
+  public void setQueryMasterclientPort(int port) {
+    queryMasterClientPort = port;
+  }
+
+  public int getQueryMasterClientPort() {
+    return queryMasterClientPort;
+  }
+
+  public TajoProtos.QueryState getQueryState() {
+    return queryState;
+  }
+
+  public void setQueryState(TajoProtos.QueryState queryState) {
+    this.queryState = queryState;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public String getLastMessage() {
+    return lastMessage;
+  }
+
+  public void setLastMessage(String lastMessage) {
+    this.lastMessage = lastMessage;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  @Override
+  public String toString() {
+    return queryId.toString() + "state=" + queryState +",progress=" + progress + ", queryMaster="
+        + getQueryMasterHost();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
new file mode 100644
index 0000000..811de1b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
+  private QueryInfo queryInfo;
+
+  public QueryJobEvent(Type type, QueryInfo queryInfo) {
+    super(type);
+
+    this.queryInfo = queryInfo;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public enum Type {
+    QUERY_JOB_START,
+    QUERY_JOB_HEARTBEAT,
+    QUERY_JOB_FINISH,
+    QUERY_MASTER_START,
+    QUERY_MASTER_STOP,
+    QUERY_JOB_KILL
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
new file mode 100644
index 0000000..ca45534
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.session.Session;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryJobManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
+
+  // TajoMaster Context
+  private final TajoMaster.MasterContext masterContext;
+
+  private AsyncDispatcher dispatcher;
+
+  private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
+
+  private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
+
+  public QueryJobManager(final TajoMaster.MasterContext masterContext) {
+    super(QueryJobManager.class.getName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      this.dispatcher = new AsyncDispatcher();
+      addService(this.dispatcher);
+
+      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+    } catch (Exception e) {
+      catchException(null, e);
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    synchronized(runningQueries) {
+      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+        eachQueryInProgress.stop();
+      }
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public Collection<QueryInProgress> getRunningQueries() {
+    return Collections.unmodifiableCollection(runningQueries.values());
+  }
+
+  public Collection<QueryInProgress> getFinishedQueries() {
+    return Collections.unmodifiableCollection(finishedQueries.values());
+  }
+
+  public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql, LogicalRootNode plan)
+      throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, plan);
+
+    synchronized(runningQueries) {
+      runningQueries.put(queryId, queryInProgress);
+    }
+
+    addService(queryInProgress);
+    queryInProgress.init(getConfig());
+    queryInProgress.start();
+
+    if(!queryInProgress.startQueryMaster()) {
+      return null;
+    }
+
+    return queryInProgress.getQueryInfo();
+  }
+
+  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+    @Override
+    public void handle(QueryJobEvent event) {
+      QueryInProgress queryInProgress = null;
+      synchronized(runningQueries) {
+        queryInProgress = runningQueries.get(event.getQueryInfo().getQueryId());
+        if(queryInProgress == null) {
+          LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+          return;
+        }
+      }
+      queryInProgress.getEventHandler().handle(event);
+    }
+  }
+
+  public QueryInProgress getQueryInProgress(QueryId queryId) {
+    synchronized(runningQueries) {
+      return runningQueries.get(queryId);
+    }
+  }
+
+  public QueryInProgress getFinishedQuery(QueryId queryId) {
+    synchronized(finishedQueries) {
+      return finishedQueries.get(queryId);
+    }
+  }
+
+  public void stopQuery(QueryId queryId) {
+    LOG.info("Stop QueryInProgress:" + queryId);
+    QueryInProgress queryInProgress = getQueryInProgress(queryId);
+    if(queryInProgress != null) {
+      queryInProgress.stop();
+      synchronized(runningQueries) {
+        runningQueries.remove(queryId);
+        finishedQueries.put(queryId, queryInProgress);
+      }
+    } else {
+      LOG.warn("No QueryInProgress while query stopping: " + queryId);
+    }
+  }
+
+  private void catchException(QueryId queryId, Exception e) {
+    LOG.error(e.getMessage(), e);
+    QueryInProgress queryInProgress = runningQueries.get(queryId);
+    queryInProgress.catchException(e);
+  }
+
+  public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+      TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryInProgress == null) {
+      return null;
+    }
+
+    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+    return null;
+  }
+
+  private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryHeartbeat.getTajoWorkerHost() != null) {
+      queryInfo.setQueryMaster(queryHeartbeat.getTajoWorkerHost());
+      queryInfo.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort());
+      queryInfo.setQueryMasterclientPort(queryHeartbeat.getTajoWorkerClientPort());
+    }
+    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+    queryInfo.setQueryState(queryHeartbeat.getState());
+    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+    if (queryHeartbeat.hasQueryFinishTime()) {
+      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+    }
+
+    return queryInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
new file mode 100644
index 0000000..523f5ba
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
+
+// TODO - when exception, send error status to QueryJobManager
+public class QueryMaster extends CompositeService implements EventHandler {
+  private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
+
+  private int querySessionTimeout;
+
+  private Clock clock;
+
+  private TajoAsyncDispatcher dispatcher;
+
+  private GlobalPlanner globalPlanner;
+
+  private AbstractStorageManager storageManager;
+
+  private TajoConf systemConf;
+
+  private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
+
+  private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap();
+
+  private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
+
+  private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
+
+  private QueryMasterContext queryMasterContext;
+
+  private QueryHeartbeatThread queryHeartbeatThread;
+
+  private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread;
+
+  private TajoWorker.WorkerContext workerContext;
+
+  private RpcConnectionPool connPool;
+
+  public QueryMaster(TajoWorker.WorkerContext workerContext) {
+    super(QueryMaster.class.getName());
+    this.workerContext = workerContext;
+  }
+
+  public void init(Configuration conf) {
+    LOG.info("QueryMaster init");
+    try {
+      this.systemConf = (TajoConf)conf;
+      this.connPool = RpcConnectionPool.getPool(systemConf);
+
+      querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+      queryMasterContext = new QueryMasterContext(systemConf);
+
+      clock = new SystemClock();
+
+      this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
+      addIfService(dispatcher);
+
+      this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
+
+      globalPlanner = new GlobalPlanner(systemConf, workerContext);
+
+      dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
+
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      throw new RuntimeException(t);
+    }
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("QueryMaster start");
+
+    queryHeartbeatThread = new QueryHeartbeatThread();
+    queryHeartbeatThread.start();
+
+    clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
+    clientSessionTimeoutCheckThread.start();
+
+    finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread();
+    finishedQueryMasterTaskCleanThread.start();
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(queryMasterStop.getAndSet(true)){
+      return;
+    }
+
+    if(queryHeartbeatThread != null) {
+      queryHeartbeatThread.interrupt();
+    }
+
+    if(clientSessionTimeoutCheckThread != null) {
+      clientSessionTimeoutCheckThread.interrupt();
+    }
+
+    if(finishedQueryMasterTaskCleanThread != null) {
+      finishedQueryMasterTaskCleanThread.interrupt();
+    }
+    super.stop();
+
+    LOG.info("QueryMaster stop");
+    if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
+      queryMasterContext.getWorkerContext().stopWorker(true);
+    }
+  }
+
+  private void cleanup(QueryId queryId) {
+    LOG.info("cleanup query resources : " + queryId);
+    NettyClientBase rpc = null;
+    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+
+    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+      try {
+        if (worker.getPeerRpcPort() == 0) continue;
+
+        rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+            TajoWorkerProtocol.class, true);
+        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
+
+        tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      } finally {
+        connPool.releaseConnection(rpc);
+      }
+    }
+  }
+
+  public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+
+    NettyClientBase rpc = null;
+    try {
+      rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+          TajoMasterProtocol.class, true);
+      TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+
+      CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
+          new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+      masterService.getAllWorkerResource(callBack.getController(),
+          PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
+
+      TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+      return workerResourcesRequest.getWorkerResourcesList();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(rpc);
+    }
+    return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
+  }
+
+  public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
+    LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
+    NettyClientBase tmClient = null;
+    try {
+      tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+          TajoMasterProtocol.class, true);
+      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+
+      TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
+          .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
+          .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
+          .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+          .setState(state)
+          .setQueryId(queryId.getProto());
+
+      CallFuture<TajoHeartbeatResponse> callBack =
+          new CallFuture<TajoHeartbeatResponse>();
+
+      masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
+    }
+  }
+
+  @Override
+  public void handle(Event event) {
+    dispatcher.getEventHandler().handle(event);
+  }
+
+  public Query getQuery(QueryId queryId) {
+    return queryMasterTasks.get(queryId).getQuery();
+  }
+
+  public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+    return queryMasterTasks.get(queryId);
+  }
+
+  public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) {
+    QueryMasterTask queryMasterTask =  queryMasterTasks.get(queryId);
+    if(queryMasterTask != null) {
+      return queryMasterTask;
+    } else {
+      if(includeFinished) {
+        return finishedQueryMasterTasks.get(queryId);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  public QueryMasterContext getContext() {
+    return this.queryMasterContext;
+  }
+
+  public Collection<QueryMasterTask> getQueryMasterTasks() {
+    return queryMasterTasks.values();
+  }
+
+  public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
+    return finishedQueryMasterTasks.values();
+  }
+
+  public class QueryMasterContext {
+    private TajoConf conf;
+
+    public QueryMasterContext(TajoConf conf) {
+      this.conf = conf;
+    }
+
+    public TajoConf getConf() {
+      return conf;
+    }
+
+    public TajoAsyncDispatcher getDispatcher() {
+      return dispatcher;
+    }
+
+    public Clock getClock() {
+      return clock;
+    }
+
+    public AbstractStorageManager getStorageManager() {
+      return storageManager;
+    }
+
+    public QueryMaster getQueryMaster() {
+      return QueryMaster.this;
+    }
+
+    public GlobalPlanner getGlobalPlanner() {
+      return globalPlanner;
+    }
+
+    public TajoWorker.WorkerContext getWorkerContext() {
+      return workerContext;
+    }
+
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    public void stopQuery(QueryId queryId) {
+      QueryMasterTask queryMasterTask;
+      queryMasterTask = queryMasterTasks.remove(queryId);
+      finishedQueryMasterTasks.put(queryId, queryMasterTask);
+
+      if(queryMasterTask != null) {
+        TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
+        CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
+
+        NettyClientBase tmClient = null;
+        try {
+          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+          TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+          masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
+        }  catch (Exception e) {
+          //this function will be closed in new thread.
+          //When tajo do stop cluster, tajo master maybe throw closed connection exception
+
+          LOG.error(e.getMessage(), e);
+        } finally {
+          connPool.releaseConnection(tmClient);
+        }
+
+        try {
+          queryMasterTask.stop();
+          if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")
+              && !workerContext.isYarnContainerMode()) {
+            cleanup(queryId);       // TODO We will support yarn mode
+          }
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      } else {
+        LOG.warn("No query info:" + queryId);
+      }
+      if(workerContext.isYarnContainerMode()) {
+        stop();
+      }
+    }
+  }
+
+  private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
+    TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
+
+    builder.setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName());
+    builder.setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort());
+    builder.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort());
+    builder.setState(queryMasterTask.getState());
+    builder.setQueryId(queryMasterTask.getQueryId().getProto());
+
+    if (queryMasterTask.getQuery() != null) {
+      builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
+      builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
+    }
+    return builder.build();
+  }
+
+  private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
+    @Override
+    public void handle(QueryStartEvent event) {
+      LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
+      QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
+          event.getQueryId(), event.getSession(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
+
+      queryMasterTask.init(systemConf);
+      if (!queryMasterTask.isInitError()) {
+        queryMasterTask.start();
+      }
+
+      synchronized(queryMasterTasks) {
+        queryMasterTasks.put(event.getQueryId(), queryMasterTask);
+      }
+
+      if (queryMasterTask.isInitError()) {
+        queryMasterContext.stopQuery(queryMasterTask.getQueryId());
+        return;
+      }
+    }
+  }
+
+  class QueryHeartbeatThread extends Thread {
+    public QueryHeartbeatThread() {
+      super("QueryHeartbeatThread");
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Start QueryMaster heartbeat thread");
+      while(!queryMasterStop.get()) {
+        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+        synchronized(queryMasterTasks) {
+          tempTasks.addAll(queryMasterTasks.values());
+        }
+        synchronized(queryMasterTasks) {
+          for(QueryMasterTask eachTask: tempTasks) {
+            NettyClientBase tmClient;
+            try {
+              tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                  TajoMasterProtocol.class, true);
+              TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+
+              CallFuture<TajoHeartbeatResponse> callBack =
+                  new CallFuture<TajoHeartbeatResponse>();
+
+              TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
+              masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
+            } catch (Throwable t) {
+              t.printStackTrace();
+            }
+          }
+        }
+        synchronized(queryMasterStop) {
+          try {
+            queryMasterStop.wait(2000);
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      }
+      LOG.info("QueryMaster heartbeat thread stopped");
+    }
+  }
+
+  class ClientSessionTimeoutCheckThread extends Thread {
+    public void run() {
+      LOG.info("ClientSessionTimeoutCheckThread started");
+      while(!queryMasterStop.get()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          break;
+        }
+        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+        synchronized(queryMasterTasks) {
+          tempTasks.addAll(queryMasterTasks.values());
+        }
+
+        for(QueryMasterTask eachTask: tempTasks) {
+          if(!eachTask.isStopped()) {
+            try {
+              long lastHeartbeat = eachTask.getLastClientHeartbeat();
+              long time = System.currentTimeMillis() - lastHeartbeat;
+              if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
+                LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
+                eachTask.expiredSessionTimeout();
+              }
+            } catch (Exception e) {
+              LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  class FinishedQueryMasterTaskCleanThread extends Thread {
+    public void run() {
+      int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+      LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
+      while(!queryMasterStop.get()) {
+        try {
+          Thread.sleep(60 * 1000 * 60);   // hourly
+        } catch (InterruptedException e) {
+          break;
+        }
+        try {
+          long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000;
+          cleanExpiredFinishedQueryMasterTask(expireTime);
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }
+
+    private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
+      synchronized(finishedQueryMasterTasks) {
+        List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
+        for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) {
+          if(entry.getValue().getStartTime() < expireTime) {
+            expiredQueryIds.add(entry.getKey());
+          }
+        }
+
+        for(QueryId eachId: expiredQueryIds) {
+          finishedQueryMasterTasks.remove(eachId);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
new file mode 100644
index 0000000..bf59e9f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.LazyTaskScheduler;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.net.InetSocketAddress;
+
+public class QueryMasterManagerService extends CompositeService
+    implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
+  private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
+
+  private AsyncRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private int port;
+
+  private QueryMaster queryMaster;
+
+  private TajoWorker.WorkerContext workerContext;
+
+  public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
+    super(QueryMasterManagerService.class.getName());
+    this.workerContext = workerContext;
+    this.port = port;
+  }
+
+  public QueryMaster getQueryMaster() {
+    return queryMaster;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf tajoConf = (TajoConf) conf;
+    try {
+      // Setup RPC server
+      InetSocketAddress initIsa =
+          new InetSocketAddress("0.0.0.0", port);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+      this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
+      this.rpcServer.start();
+
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+      this.port = bindAddr.getPort();
+
+      queryMaster = new QueryMaster(workerContext);
+      addService(queryMaster);
+
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info("QueryMasterManagerService is bind to " + addr);
+    ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("QueryMasterManagerService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+  public String getHostAndPort() {
+    return bindAddr.getHostName() + ":" + bindAddr.getPort();
+  }
+
+  @Override
+  public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
+                      RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
+    try {
+      ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
+      QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
+
+      if(queryMasterTask == null || queryMasterTask.isStopped()) {
+        done.run(LazyTaskScheduler.stopTaskRunnerReq);
+      } else {
+        ContainerId cid =
+            queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
+        LOG.debug("getTask:" + cid + ", ebId:" + ebId);
+        queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
+      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+      if (queryMasterTask == null) {
+        queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+      }
+      SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+      QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
+      QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
+
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+      }
+
+      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+        LOG.warn(attemptId + " Killed");
+        attempt.handle(
+            new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+      } else {
+        queryMasterTask.getEventHandler().handle(
+            new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+      }
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void ping(RpcController controller,
+                   TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
+  public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
+                         RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
+                        RpcCallback<PrimitiveProtos.BoolProto> done) {
+    QueryId queryId = new QueryId(request);
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+    queryMasterTask.getQuery().handle(new QueryEvent(queryId, QueryEventType.KILL));
+  }
+
+  @Override
+  public void executeQuery(RpcController controller,
+                           TajoWorkerProtocol.QueryExecutionRequestProto request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
+      QueryId queryId = new QueryId(request.getQueryId());
+      LOG.info("Receive executeQuery request:" + queryId);
+      queryMaster.handle(new QueryStartEvent(queryId,
+          new Session(request.getSession()),
+          new QueryContext(request.getQueryContext()), request.getSql().getValue(),
+          request.getLogicalPlanJson().getValue()));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
new file mode 100644
index 0000000..56dd789
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+@Deprecated
+public class QueryMasterRunner extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
+  private TajoConf systemConf;
+  private QueryMaster queryMaster;
+  private QueryId queryId;
+  private String queryMasterManagerAddress;
+
+  public QueryMasterRunner(QueryId queryId, String queryMasterManagerAddress) {
+    super(QueryMasterRunner.class.getName());
+    this.queryId = queryId;
+    this.queryMasterManagerAddress = queryMasterManagerAddress;
+  }
+
+  private class ShutdownHook implements Runnable {
+    @Override
+    public void run() {
+      LOG.info("============================================");
+      LOG.info("QueryMaster received SIGINT Signal");
+      LOG.info("============================================");
+      stop();
+    }
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.systemConf = (TajoConf)conf;
+    RackResolver.init(systemConf);
+    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    //create QueryMaster
+    QueryMaster query = new QueryMaster(null);
+
+    query.init(systemConf);
+    query.start();
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  public static void main(String[] args) throws Exception {
+    LOG.info("QueryMasterRunner started");
+
+    final TajoConf conf = new TajoConf();
+    conf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
+    UserGroupInformation.setConfiguration(conf);
+
+    final QueryId queryId = TajoIdUtils.parseQueryId(args[0]);
+    final String queryMasterManagerAddr = args[1];
+
+    LOG.info("Received QueryId:" + queryId);
+
+    QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, queryMasterManagerAddr);
+    queryMasterRunner.init(conf);
+    queryMasterRunner.start();
+
+    synchronized(queryId) {
+      queryId.wait();
+    }
+
+    System.exit(0);
+  }
+
+  public static void printThreadInfo(PrintWriter stream, String title) {
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+    final int STACK_DEPTH = 60;
+    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+    long[] threadIds = threadBean.getAllThreadIds();
+    stream.println("Process Thread Dump: " + title);
+    stream.println(threadIds.length + " active threads");
+    for (long tid : threadIds) {
+      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+      if (info == null) {
+        stream.println("  Inactive");
+        continue;
+      }
+      stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
+      Thread.State state = info.getThreadState();
+      stream.println("  State: " + state);
+      stream.println("  Blocked count: " + info.getBlockedCount());
+      stream.println("  Waited count: " + info.getWaitedCount());
+      if (contention) {
+        stream.println("  Blocked time: " + info.getBlockedTime());
+        stream.println("  Waited time: " + info.getWaitedTime());
+      }
+      if (state == Thread.State.WAITING) {
+        stream.println("  Waiting on " + info.getLockName());
+      } else if (state == Thread.State.BLOCKED) {
+        stream.println("  Blocked on " + info.getLockName());
+        stream.println("  Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
+      }
+      stream.println("  Stack:");
+      for (StackTraceElement frame : info.getStackTrace()) {
+        stream.println("    " + frame.toString());
+      }
+    }
+    stream.flush();
+  }
+
+  private static String getTaskName(long id, String name) {
+    if (name == null) {
+      return Long.toString(id);
+    }
+    return id + " (" + name + ")";
+  }
+}