You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:03 UTC

[09/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
deleted file mode 100644
index d353733..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ /dev/null
@@ -1,766 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.Records;
-import tajo.QueryIdFactory;
-import tajo.QueryUnitId;
-import tajo.SubQueryId;
-import tajo.catalog.CatalogService;
-import tajo.catalog.CatalogUtil;
-import tajo.catalog.TableDesc;
-import tajo.catalog.TableMeta;
-import tajo.catalog.statistics.ColumnStat;
-import tajo.catalog.statistics.StatisticsUtil;
-import tajo.catalog.statistics.TableStat;
-import tajo.conf.TajoConf;
-import tajo.engine.planner.PlannerUtil;
-import tajo.engine.planner.logical.ExprType;
-import tajo.engine.planner.logical.GroupbyNode;
-import tajo.engine.planner.logical.ScanNode;
-import tajo.engine.planner.logical.StoreTableNode;
-import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerGroupEvent.EventType;
-import tajo.master.event.*;
-import tajo.storage.Fragment;
-import tajo.storage.StorageManager;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static tajo.conf.TajoConf.ConfVars;
-
-
-/**
- * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine.
- */
-public class SubQuery implements EventHandler<SubQueryEvent> {
-
-  private static final Log LOG = LogFactory.getLog(SubQuery.class);
-
-  private ExecutionBlock block;
-  private int priority;
-  private TableMeta meta;
-  private EventHandler eventHandler;
-  private final StorageManager sm;
-  private TaskSchedulerImpl taskScheduler;
-  private QueryContext context;
-
-  private long startTime;
-  private long finishTime;
-
-  volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
-  volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
-
-
-  private static ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
-  private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent>
-      stateMachine;
-
-  private StateMachineFactory<SubQuery, SubQueryState,
-      SubQueryEventType, SubQueryEvent> stateMachineFactory =
-      new StateMachineFactory <SubQuery, SubQueryState,
-          SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
-
-          .addTransition(SubQueryState.NEW,
-              EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
-              SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
-
-          .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
-
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED,
-              EnumSet.of(SubQueryState.RUNNING, SubQueryState.FAILED,
-                  SubQueryState.SUCCEEDED), SubQueryEventType.SQ_START, new StartTransition())
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
-
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
-              SubQueryEventType.SQ_TASK_COMPLETED, new TaskCompletedTransition())
-          .addTransition(SubQueryState.RUNNING, SubQueryState.SUCCEEDED,
-              SubQueryEventType.SQ_SUBQUERY_COMPLETED, new SubQueryCompleteTransition())
-          .addTransition(SubQueryState.RUNNING, SubQueryState.FAILED,
-              SubQueryEventType.SQ_FAILED, new InternalErrorTransition())
-
-          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
-              SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
-
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-                 SubQueryEventType.SQ_FAILED)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_INTERNAL_ERROR);
-
-
-  private final Lock readLock;
-  private final Lock writeLock;
-
-  private int completedTaskCount = 0;
-
-  public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) {
-    this.context = context;
-    this.block = block;
-    this.sm = sm;
-    this.eventHandler = context.getEventHandler();
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-    stateMachine = stateMachineFactory.make(this);
-  }
-
-  public QueryContext getContext() {
-    return context;
-  }
-
-  public EventHandler getEventHandler() {
-    return eventHandler;
-  }
-
-  public TaskScheduler getTaskScheduler() {
-    return taskScheduler;
-  }
-
-  public void setStartTime() {
-    startTime = context.getClock().getTime();
-  }
-
-  @SuppressWarnings("UnusedDeclaration")
-  public long getStartTime() {
-    return this.startTime;
-  }
-
-  public void setFinishTime() {
-    finishTime = context.getClock().getTime();
-  }
-
-  @SuppressWarnings("UnusedDeclaration")
-  public long getFinishTime() {
-    return this.finishTime;
-  }
-
-  public float getProgress() {
-    readLock.lock();
-    try {
-      if (getState() == SubQueryState.NEW) {
-        return 0;
-      } else {
-        if (completedTaskCount == 0) {
-          return 0.0f;
-        } else {
-          return (float)completedTaskCount / (float)tasks.size();
-        }
-      }
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public ExecutionBlock getBlock() {
-    return block;
-  }
-
-  public void addTask(QueryUnit task) {
-    tasks.put(task.getId(), task);
-  }
-
-  public void abortSubQuery(SubQueryState finalState) {
-    // TODO -
-    // - committer.abortSubQuery(...)
-    // - record SubQuery Finish Time
-    // - CleanUp Tasks
-    // - Record History
-
-    eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
-  }
-
-  public StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> getStateMachine() {
-    return this.stateMachine;
-  }
-
-  public void setPriority(int priority) {
-    this.priority = priority;
-  }
-
-
-  public int getPriority() {
-    return this.priority;
-  }
-
-  public StorageManager getStorageManager() {
-    return sm;
-  }
-  
-  public SubQuery getChildQuery(ScanNode scanForChild) {
-    return context.getSubQuery(block.getChildBlock(scanForChild).getId());
-  }
-  
-  public SubQueryId getId() {
-    return block.getId();
-  }
-  
-  public QueryUnit[] getQueryUnits() {
-    return tasks.values().toArray(new QueryUnit[tasks.size()]);
-  }
-  
-  public QueryUnit getQueryUnit(QueryUnitId qid) {
-    return tasks.get(qid);
-  }
-
-  public void setTableMeta(TableMeta meta) {
-    this.meta = meta;
-  }
-
-  @SuppressWarnings("UnusedDeclaration")
-  public TableMeta getTableMeta() {
-    return meta;
-  }
-
-  public TableStat getTableStat() {
-    return this.meta.getStat();
-  }
-
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(this.getId());
-    return sb.toString();
-  }
-  
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof SubQuery) {
-      SubQuery other = (SubQuery)o;
-      return getId().equals(other.getId());
-    }
-    return false;
-  }
-  
-  @Override
-  public int hashCode() {
-    return getId().hashCode();
-  }
-  
-  public int compareTo(SubQuery other) {
-    return getId().compareTo(other.getId());
-  }
-
-  public SubQueryState getState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  private static TableStat computeStatFromUnionBlock(SubQuery unit) {
-    TableStat stat = new TableStat();
-    TableStat childStat;
-    long avgRows = 0, numBytes = 0, numRows = 0;
-    int numBlocks = 0, numPartitions = 0;
-    List<ColumnStat> columnStats = Lists.newArrayList();
-
-    Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
-    while (it.hasNext()) {
-      ExecutionBlock block = it.next();
-      SubQuery childSubQuery = unit.context.getSubQuery(block.getId());
-      childStat = childSubQuery.getTableStat();
-      avgRows += childStat.getAvgRows();
-      columnStats.addAll(childStat.getColumnStats());
-      numBlocks += childStat.getNumBlocks();
-      numBytes += childStat.getNumBytes();
-      numPartitions += childStat.getNumPartitions();
-      numRows += childStat.getNumRows();
-    }
-
-    stat.setColumnStats(columnStats);
-    stat.setNumBlocks(numBlocks);
-    stat.setNumBytes(numBytes);
-    stat.setNumPartitions(numPartitions);
-    stat.setNumRows(numRows);
-    stat.setAvgRows(avgRows);
-    return stat;
-  }
-
-  public TableMeta buildTableMeta() throws IOException {
-    finishTime = context.getClock().getTime();
-
-    TableStat stat;
-    if (block.hasUnion()) {
-      stat = computeStatFromUnionBlock(this);
-    } else {
-      stat = computeStatFromTasks();
-    }
-    TableMeta meta = writeStat(this, stat);
-    meta.setStat(stat);
-    setTableMeta(meta);
-    return meta;
-  }
-
-  private TableStat computeStatFromTasks() {
-    List<TableStat> stats = Lists.newArrayList();
-    for (QueryUnit unit : getQueryUnits()) {
-      stats.add(unit.getStats());
-    }
-    TableStat tableStat = StatisticsUtil.aggregateTableStat(stats);
-    return tableStat;
-  }
-
-  private TableMeta writeStat(SubQuery subQuery, TableStat stat)
-      throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    StoreTableNode storeTableNode = execBlock.getStoreTableNode();
-    TableMeta meta = toTableMeta(storeTableNode);
-    meta.setStat(stat);
-    sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
-    return meta;
-  }
-
-  private static TableMeta toTableMeta(StoreTableNode store) {
-    if (store.hasOptions()) {
-      return CatalogUtil.newTableMeta(store.getOutSchema(),
-          store.getStorageType(), store.getOptions());
-    } else {
-      return CatalogUtil.newTableMeta(store.getOutSchema(),
-          store.getStorageType());
-    }
-  }
-
-  private void stopScheduler() {
-    // If there are launched TaskRunners, send the 'shouldDie' message to all r
-    // via received task requests.
-    if (taskScheduler != null) {
-      taskScheduler.stop();
-    }
-  }
-
-  private void releaseContainers() {
-    // If there are still live TaskRunners, try to kill the containers.
-    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(),
-        containers.values()));
-  }
-
-  private void finish() {
-    TableMeta meta = null;
-    try {
-      meta = buildTableMeta();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    setTableMeta(meta);
-    setFinishTime();
-    eventHandler.handle(new SubQuerySucceeEvent(getId(), meta));
-  }
-
-  @Override
-  public void handle(SubQueryEvent event) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType());
-    }
-
-    try {
-      writeLock.lock();
-      SubQueryState oldState = getState();
-      try {
-        getStateMachine().doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
-        eventHandler.handle(new SubQueryEvent(getId(),
-            SubQueryEventType.SQ_INTERNAL_ERROR));
-      }
-
-      // notify the eventhandler of state change
-      if (LOG.isDebugEnabled()) {
-        if (oldState != getState()) {
-          LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
-              + getState());
-        }
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-
-  private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
-      SubQueryEvent, SubQueryState> {
-
-    @Override
-    public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      subQuery.setStartTime();
-      ExecutionBlock execBlock = subQuery.getBlock();
-      SubQueryState state;
-
-      try {
-        // Union operator does not require actual query processing. It is performed logically.
-        if (execBlock.hasUnion()) {
-          subQuery.finish();
-          state = SubQueryState.SUCCEEDED;
-        } else {
-          setRepartitionIfNecessary(subQuery);
-          createTasks(subQuery);
-
-          if (subQuery.tasks.size() == 0) { // if there is no tasks
-            subQuery.finish();
-            return SubQueryState.SUCCEEDED;
-          } else {
-            initTaskScheduler(subQuery);
-            allocateContainers(subQuery);
-            return SubQueryState.INIT;
-          }
-        }
-      } catch (Exception e) {
-        LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
-        subQuery.eventHandler.handle(
-            new QueryDiagnosticsUpdateEvent(subQuery.getId().getQueryId(), e.getMessage()));
-        subQuery.eventHandler.handle(
-            new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.FAILED));
-        return SubQueryState.FAILED;
-      }
-
-      return state;
-    }
-
-    private void initTaskScheduler(SubQuery subQuery) {
-      subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.context);
-      subQuery.taskScheduler.init(subQuery.context.getConf());
-      subQuery.taskScheduler.start();
-    }
-
-    /**
-     * If a parent block requires a repartition operation, the method sets proper repartition
-     * methods and the number of partitions to a given subquery.
-     */
-    private static void setRepartitionIfNecessary(SubQuery subQuery) {
-      if (subQuery.getBlock().hasParentBlock()) {
-        int numTasks = calculatePartitionNum(subQuery);
-        Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
-      }
-    }
-
-    /**
-     * Getting the desire number of partitions according to the volume of input data.
-     * This method is only used to determine the partition key number of hash join or aggregation.
-     *
-     * @param subQuery
-     * @return
-     */
-    public static int calculatePartitionNum(SubQuery subQuery) {
-      TajoConf conf = subQuery.context.getConf();
-      ExecutionBlock parent = subQuery.getBlock().getParentBlock();
-
-      GroupbyNode grpNode = null;
-      if (parent != null) {
-        grpNode = (GroupbyNode) PlannerUtil.findTopNode(
-            parent.getPlan(), ExprType.GROUP_BY);
-      }
-
-      // Is this subquery the first step of join?
-      if (parent != null && parent.getScanNodes().length == 2) {
-        Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
-
-        // for inner
-        ExecutionBlock outer = child.next();
-        long outerVolume = getInputVolume(subQuery.context, outer);
-
-        // for inner
-        ExecutionBlock inner = child.next();
-        long innerVolume = getInputVolume(subQuery.context, inner);
-        LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
-        LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
-
-        long smaller = Math.min(outerVolume, innerVolume);
-
-        int mb = (int) Math.ceil((double)smaller / 1048576);
-        LOG.info("Smaller Table's volume is approximately " + mb + " MB");
-        // determine the number of task
-        int taskNum = (int) Math.ceil((double)mb /
-            conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME));
-        LOG.info("The determined number of join partitions is " + taskNum);
-        return taskNum;
-
-        // Is this subquery the first step of group-by?
-      } else if (grpNode != null) {
-
-        if (grpNode.getGroupingColumns().length == 0) {
-          return 1;
-        } else {
-          long volume = getInputVolume(subQuery.context, subQuery.block);
-
-          int mb = (int) Math.ceil((double)volume / 1048576);
-          LOG.info("Table's volume is approximately " + mb + " MB");
-          // determine the number of task
-          int taskNum = (int) Math.ceil((double)mb /
-              conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME));
-          LOG.info("The determined number of aggregation partitions is " + taskNum);
-          return taskNum;
-        }
-      } else {
-        LOG.info("============>>>>> Unexpected Case! <<<<<================");
-        long volume = getInputVolume(subQuery.context, subQuery.block);
-
-        int mb = (int) Math.ceil((double)volume / 1048576);
-        LOG.info("Table's volume is approximately " + mb + " MB");
-        // determine the number of task per 128MB
-        int taskNum = (int) Math.ceil((double)mb / 128);
-        LOG.info("The determined number of partitions is " + taskNum);
-        return taskNum;
-      }
-    }
-
-    private static void createTasks(SubQuery subQuery) throws IOException {
-      ExecutionBlock execBlock = subQuery.getBlock();
-      QueryUnit [] tasks;
-      if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
-        tasks = createLeafTasks(subQuery);
-
-      } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
-        tasks = Repartitioner.createJoinTasks(subQuery);
-
-      } else { // Case 3: Others (Sort or Aggregation)
-        int numTasks = getNonLeafTaskNum(subQuery);
-        SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId();
-        SubQuery child = subQuery.context.getSubQuery(childId);
-        tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
-      }
-
-      LOG.info("Create " + tasks.length + " Tasks");
-
-      for (QueryUnit task : tasks) {
-        subQuery.addTask(task);
-      }
-    }
-
-    /**
-     * Getting the desire number of tasks according to the volume of input data
-     *
-     * @param subQuery
-     * @return
-     */
-    public static int getNonLeafTaskNum(SubQuery subQuery) {
-      // Getting intermediate data size
-      long volume = getInputVolume(subQuery.context, subQuery.getBlock());
-
-      int mb = (int) Math.ceil((double)volume / 1048576);
-      LOG.info("Table's volume is approximately " + mb + " MB");
-      // determine the number of task per 64MB
-      int maxTaskNum = (int) Math.ceil((double)mb / 64);
-      LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
-      return maxTaskNum;
-    }
-
-    public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) {
-      CatalogService catalog = context.getCatalog();
-      if (execBlock.isLeafBlock()) {
-        ScanNode outerScan = execBlock.getScanNodes()[0];
-        TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
-        return stat.getNumBytes();
-      } else {
-        long aggregatedVolume = 0;
-        for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
-          SubQuery subquery = context.getSubQuery(childBlock.getId());
-          aggregatedVolume += subquery.getTableStat().getNumBytes();
-        }
-
-        return aggregatedVolume;
-      }
-    }
-
-    public static void allocateContainers(SubQuery subQuery) {
-      ExecutionBlock execBlock = subQuery.getBlock();
-      QueryUnit [] tasks = subQuery.getQueryUnits();
-
-      int numClusterNodes = subQuery.getContext().getNumClusterNode();
-      int numRequest = Math.min(tasks.length, numClusterNodes * 4);
-
-      final Resource resource = Records.newRecord(Resource.class);
-      if (tasks.length <= numClusterNodes) {
-        resource.setMemory(subQuery.context.getMaxContainerCapability());
-      } else {
-        resource.setMemory(2000);
-      }
-
-      Priority priority = Records.newRecord(Priority.class);
-      priority.setPriority(subQuery.getPriority());
-      ContainerAllocationEvent event =
-          new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
-              subQuery.getId(), priority, resource, numRequest,
-              execBlock.isLeafBlock(), 0.0f);
-      subQuery.eventHandler.handle(event);
-    }
-
-    private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
-      ExecutionBlock execBlock = subQuery.getBlock();
-      ScanNode[] scans = execBlock.getScanNodes();
-      Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
-      TableMeta meta;
-      Path inputPath;
-
-      ScanNode scan = scans[0];
-      TableDesc desc = subQuery.context.getCatalog().getTableDesc(scan.getTableId());
-      inputPath = desc.getPath();
-      meta = desc.getMeta();
-
-      // TODO - should be change the inner directory
-      Path oldPath = new Path(inputPath, "data");
-      FileSystem fs = inputPath.getFileSystem(subQuery.context.getConf());
-      if (fs.exists(oldPath)) {
-        inputPath = oldPath;
-      }
-      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
-
-      QueryUnit queryUnit;
-      List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-
-      int i = 0;
-      for (Fragment fragment : fragments) {
-        queryUnit = newQueryUnit(subQuery, i++, fragment);
-        queryUnits.add(queryUnit);
-      }
-
-      return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
-    }
-
-    private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
-      ExecutionBlock execBlock = subQuery.getBlock();
-      QueryUnit unit = new QueryUnit(
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
-          subQuery.eventHandler);
-      unit.setLogicalPlan(execBlock.getPlan());
-      unit.setFragment2(fragment);
-      return unit;
-    }
-  }
-
-  int i = 0;
-  private static class ContainerLaunchTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
-    @Override
-    public void transition(SubQuery subQuery, SubQueryEvent event) {
-      SubQueryContainerAllocationEvent allocationEvent =
-          (SubQueryContainerAllocationEvent) event;
-      for (Container container : allocationEvent.getAllocatedContainer()) {
-        ContainerId cId = container.getId();
-        if (subQuery.containers.containsKey(cId)) {
-          LOG.info(">>>>>>>>>>>> Duplicate Container! <<<<<<<<<<<");
-        }
-        subQuery.containers.put(cId, container);
-        // TODO - This is debugging message. Should be removed
-        subQuery.i++;
-      }
-      LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
-      subQuery.eventHandler.handle(
-          new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
-              subQuery.getId(), allocationEvent.getAllocatedContainer()));
-
-      subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
-          SubQueryEventType.SQ_START));
-    }
-  }
-
-  private static class StartTransition implements
-      MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
-
-    @Override
-    public SubQueryState transition(SubQuery subQuery,
-                           SubQueryEvent subQueryEvent) {
-      // schedule tasks
-      try {
-        for (QueryUnitId taskId : subQuery.tasks.keySet()) {
-          subQuery.eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
-        }
-
-        return  SubQueryState.RUNNING;
-      } catch (Exception e) {
-        LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
-        return SubQueryState.FAILED;
-      }
-    }
-  }
-
-  private static class TaskCompletedTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
-    @Override
-    public void transition(SubQuery subQuery,
-                                     SubQueryEvent event) {
-      subQuery.completedTaskCount++;
-      SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
-      QueryUnitAttempt task = subQuery.getQueryUnit(taskEvent.getTaskId()).getSuccessfulAttempt();
-
-      LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/"
-          + subQuery.tasks.size() + " on " + task.getHost());
-      if (subQuery.completedTaskCount == subQuery.tasks.size()) {
-        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
-            SubQueryEventType.SQ_SUBQUERY_COMPLETED));
-      }
-    }
-  }
-
-  private static class SubQueryCompleteTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
-    @Override
-    public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      // TODO - Commit subQuery & do cleanup
-      // TODO - records succeeded, failed, killed completed task
-      // TODO - records metrics
-      subQuery.stopScheduler();
-      subQuery.releaseContainers();
-      subQuery.finish();
-    }
-  }
-
-  private static class InternalErrorTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
-    @Override
-    public void transition(SubQuery subQuery,
-                           SubQueryEvent subQueryEvent) {
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java
deleted file mode 100644
index 673f1df..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-public enum SubQueryState {
-  NEW,
-  CONTAINER_ALLOCATED,
-  INIT,
-  RUNNING,
-  SUCCEEDED,
-  FAILED
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
deleted file mode 100644
index 82b1445..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.collect.Maps;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.RackResolver;
-import tajo.QueryId;
-import tajo.QueryIdFactory;
-import tajo.TajoConstants;
-import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.FunctionType;
-import tajo.common.TajoDataTypes.Type;
-import tajo.conf.TajoConf;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import tajo.engine.function.Country;
-import tajo.engine.function.InCountry;
-import tajo.engine.function.builtin.*;
-import tajo.master.event.QueryEvent;
-import tajo.master.event.QueryEventType;
-import tajo.storage.StorageManager;
-import tajo.webapp.StaticHttpServer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class TajoMaster extends CompositeService {
-
-  /** Class Logger */
-  private static final Log LOG = LogFactory.getLog(TajoMaster.class);
-
-  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
-  private MasterContext context;
-  private TajoConf conf;
-  private FileSystem defaultFS;
-  private Clock clock;
-
-  private Path basePath;
-  private Path wareHousePath;
-
-  private CatalogServer catalogServer;
-  private CatalogService catalog;
-  private StorageManager storeManager;
-  private GlobalEngine globalEngine;
-  private AsyncDispatcher dispatcher;
-  private ClientService clientService;
-  private YarnRPC yarnRPC;
-
-  //Web Server
-  private StaticHttpServer webServer;
-
-  public TajoMaster() throws Exception {
-    super(TajoMaster.class.getName());
-  }
-
-  @Override
-  public void init(Configuration _conf) {
-    this.conf = (TajoConf) _conf;
-
-    context = new MasterContext(conf);
-    clock = new SystemClock();
-
-
-    try {
-      webServer = StaticHttpServer.getInstance(this ,"admin", null, 8080 ,
-          true, null, context.getConf(), null);
-      webServer.start();
-
-      QueryIdFactory.reset();
-
-      // Get the tajo base dir
-      this.basePath = new Path(conf.getVar(ConfVars.ROOT_DIR));
-      LOG.info("Tajo Root dir is set " + basePath);
-      // Get default DFS uri from the base dir
-      this.defaultFS = basePath.getFileSystem(conf);
-      conf.set("fs.defaultFS", defaultFS.getUri().toString());
-      LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized.");
-
-      if (!defaultFS.exists(basePath)) {
-        defaultFS.mkdirs(basePath);
-        LOG.info("Tajo Base dir (" + basePath + ") is created.");
-      }
-
-      this.storeManager = new StorageManager(conf);
-
-      // Get the tajo data warehouse dir
-      this.wareHousePath = new Path(basePath, TajoConstants.WAREHOUSE_DIR);
-      LOG.info("Tajo Warehouse dir is set to " + wareHousePath);
-      if (!defaultFS.exists(wareHousePath)) {
-        defaultFS.mkdirs(wareHousePath);
-        LOG.info("Warehouse dir (" + wareHousePath + ") is created");
-      }
-
-      yarnRPC = YarnRPC.create(conf);
-
-      this.dispatcher = new AsyncDispatcher();
-      addIfService(dispatcher);
-
-      // The below is some mode-dependent codes
-      // If tajo is local mode
-      final boolean mode = conf.getBoolVar(ConfVars.CLUSTER_DISTRIBUTED);
-      if (!mode) {
-        LOG.info("Enabled Pseudo Distributed Mode");
-      } else { // if tajo is distributed mode
-        LOG.info("Enabled Distributed Mode");
-      }
-      // This is temporal solution of the above problem.
-      catalogServer = new CatalogServer(initBuiltinFunctions());
-      addIfService(catalogServer);
-      catalog = new LocalCatalog(catalogServer);
-
-      globalEngine = new GlobalEngine(context, storeManager);
-      addIfService(globalEngine);
-
-      dispatcher.register(QueryEventType.class, new QueryEventDispatcher());
-
-      clientService = new ClientService(context);
-      addIfService(clientService);
-
-      RackResolver.init(conf);
-    } catch (Exception e) {
-       e.printStackTrace();
-    }
-
-    super.init(conf);
-  }
-
-  @SuppressWarnings("unchecked")
-  public static List<FunctionDesc> initBuiltinFunctions() throws ServiceException {
-    List<FunctionDesc> sqlFuncs = new ArrayList<FunctionDesc>();
-
-    // Sum
-    sqlFuncs.add(new FunctionDesc("sum", SumInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-    sqlFuncs.add(new FunctionDesc("sum", SumLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
-    sqlFuncs.add(new FunctionDesc("sum", SumFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
-    sqlFuncs.add(new FunctionDesc("sum", SumDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
-
-    // Max
-    sqlFuncs.add(new FunctionDesc("max", MaxInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-    sqlFuncs.add(new FunctionDesc("max", MaxLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
-    sqlFuncs.add(new FunctionDesc("max", MaxFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
-    sqlFuncs.add(new FunctionDesc("max", MaxDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
-
-    // Min
-    sqlFuncs.add(new FunctionDesc("min", MinInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-    sqlFuncs.add(new FunctionDesc("min", MinLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
-    sqlFuncs.add(new FunctionDesc("min", MinFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4 )));
-    sqlFuncs.add(new FunctionDesc("min", MinDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
-    sqlFuncs.add(new FunctionDesc("min", MinString.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
-
-    // AVG
-    sqlFuncs.add(new FunctionDesc("avg", AvgInt.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-    sqlFuncs.add(new FunctionDesc("avg", AvgLong.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
-    sqlFuncs.add(new FunctionDesc("avg", AvgFloat.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
-    sqlFuncs.add(new FunctionDesc("avg", AvgDouble.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
-
-    // Count
-    sqlFuncs.add(new FunctionDesc("count", CountValue.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.ANY)));
-    sqlFuncs.add(new FunctionDesc("count", CountRows.class, FunctionType.AGGREGATION,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen()));
-
-    // GeoIP
-    sqlFuncs.add(new FunctionDesc("in_country", InCountry.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.BOOLEAN),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT, Type.TEXT)));
-    sqlFuncs.add(new FunctionDesc("country", Country.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
-
-    // Date
-    sqlFuncs.add(new FunctionDesc("date", Date.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
-
-    // Today
-    sqlFuncs.add(new FunctionDesc("today", Date.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
-        CatalogUtil.newDataTypesWithoutLen()));
-
-    sqlFuncs.add(
-        new FunctionDesc("random", RandomInt.class, FunctionType.GENERAL,
-            CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-            CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-
-    return sqlFuncs;
-  }
-
-  public MasterContext getContext() {
-    return this.context;
-  }
-
-  protected void addIfService(Object object) {
-    if (object instanceof Service) {
-      addService((Service) object);
-    }
-  }
-
-  @Override
-  public void start() {
-    LOG.info("TajoMaster startup");
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    try {
-      webServer.stop();
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-
-    super.stop();
-    LOG.info("TajoMaster main thread exiting");
-  }
-
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-  public String getMasterServerName() {
-    return null;
-  }
-
-  public boolean isMasterRunning() {
-    return getServiceState() == STATE.STARTED;
-  }
-
-  public CatalogService getCatalog() {
-    return this.catalog;
-  }
-
-  public StorageManager getStorageManager() {
-    return this.storeManager;
-  }
-
-  // TODO - to be improved
-  public Collection<TaskStatusProto> getProgressQueries() {
-    return null;
-  }
-
-  private class QueryEventDispatcher implements EventHandler<QueryEvent> {
-    @Override
-    public void handle(QueryEvent queryEvent) {
-      LOG.info("QueryEvent: " + queryEvent.getQueryId());
-      LOG.info("Found: " + context.getQuery(queryEvent.getQueryId()).getContext().getQueryId());
-      context.getQuery(queryEvent.getQueryId()).handle(queryEvent);
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
-
-    try {
-      TajoMaster master = new TajoMaster();
-      ShutdownHookManager.get().addShutdownHook(
-          new CompositeServiceShutdownHook(master),
-          SHUTDOWN_HOOK_PRIORITY);
-      TajoConf conf = new TajoConf(new YarnConfiguration());
-      master.init(conf);
-      master.start();
-    } catch (Throwable t) {
-      LOG.fatal("Error starting JobHistoryServer", t);
-      System.exit(-1);
-    }
-  }
-
-  public class MasterContext {
-    private final Map<QueryId, QueryMaster> queries = Maps.newConcurrentMap();
-    private final TajoConf conf;
-
-    public MasterContext(TajoConf conf) {
-      this.conf = conf;
-    }
-
-    public TajoConf getConf() {
-      return conf;
-    }
-
-    public Clock getClock() {
-      return clock;
-    }
-
-    public QueryMaster getQuery(QueryId queryId) {
-      return queries.get(queryId);
-    }
-
-    public Map<QueryId, QueryMaster> getAllQueries() {
-      return queries;
-    }
-
-    public AsyncDispatcher getDispatcher() {
-      return dispatcher;
-    }
-
-    public EventHandler getEventHandler() {
-      return dispatcher.getEventHandler();
-    }
-
-    public CatalogService getCatalog() {
-      return catalog;
-    }
-
-    public GlobalEngine getGlobalEngine() {
-      return globalEngine;
-    }
-
-    public StorageManager getStorageManager() {
-      return storeManager;
-    }
-
-    public YarnRPC getYarnRPC() {
-      return yarnRPC;
-    }
-
-    public ClientService getClientService() {
-      return clientService;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
deleted file mode 100644
index d1cdbc2..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerGroupEvent.EventType;
-
-import java.util.Collection;
-
-public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
-  public enum EventType {
-    CONTAINER_REMOTE_LAUNCH,
-    CONTAINER_REMOTE_CLEANUP
-  }
-
-  protected final SubQueryId subQueryId;
-  protected final Collection<Container> containers;
-  public TaskRunnerGroupEvent(EventType eventType,
-                              SubQueryId subQueryId,
-                              Collection<Container> containers) {
-    super(eventType);
-    this.subQueryId = subQueryId;
-    this.containers = containers;
-  }
-
-  public Collection<Container> getContainers() {
-    return containers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
deleted file mode 100644
index 6b73d00..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-
-public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
deleted file mode 100644
index 2279d1b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import org.apache.hadoop.yarn.util.Records;
-import tajo.QueryConf;
-import tajo.SubQueryId;
-import tajo.conf.TajoConf;
-import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerGroupEvent.EventType;
-import tajo.master.event.QueryEvent;
-import tajo.master.event.QueryEventType;
-import tajo.pullserver.PullServerAuxService;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
-
-  /** Class Logger */
-  private static final Log LOG = LogFactory.getLog(TaskRunnerLauncherImpl.class);
-  private final YarnRPC yarnRPC;
-  private final static RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
-  private QueryContext context;
-  private final String taskListenerHost;
-  private final int taskListenerPort;
-
-  // For ContainerLauncherSpec
-  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
-  private static String initialClasspath = null;
-  private static final Object classpathLock = new Object();
-  private Object commonContainerSpecLock = new Object();
-  private ContainerLaunchContext commonContainerSpec = null;
-
-  final public static FsPermission QUERYCONF_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  /** for launching TaskRunners in parallel */
-  private final ExecutorService executorService;
-
-  public TaskRunnerLauncherImpl(QueryContext context) {
-    super(TaskRunnerLauncherImpl.class.getName());
-    this.context = context;
-    taskListenerHost = context.getTaskListener().getHostName();
-    taskListenerPort = context.getTaskListener().getPort();
-    yarnRPC = context.getYarnRPC();
-    executorService = Executors.newFixedThreadPool(
-        context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
-  }
-
-  public void start() {
-    super.start();
-  }
-
-  public void stop() {
-    executorService.shutdown();
-    super.stop();
-  }
-
-  @Override
-  public void handle(TaskRunnerGroupEvent event) {
-    if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
-     launchTaskRunners(event.subQueryId, event.getContainers());
-    } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
-      killTaskRunners(event.getContainers());
-    }
-  }
-
-  private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
-    for (Container container : containers) {
-      final ContainerProxy proxy = new ContainerProxy(container, subQueryId);
-      executorService.submit(new LaunchRunner(proxy));
-    }
-  }
-
-  private class LaunchRunner implements Runnable {
-    private final ContainerProxy proxy;
-    public LaunchRunner(ContainerProxy proxy) {
-      this.proxy = proxy;
-    }
-    @Override
-    public void run() {
-      proxy.launch();
-    }
-  }
-
-  private void killTaskRunners(Collection<Container> containers) {
-    for (Container container : containers) {
-      final ContainerProxy proxy = context.getContainer(container.getId());
-      executorService.submit(new KillRunner(proxy));
-    }
-  }
-
-  private class KillRunner implements Runnable {
-    private final ContainerProxy proxy;
-    public KillRunner(ContainerProxy proxy) {
-      this.proxy = proxy;
-    }
-
-    @Override
-    public void run() {
-      proxy.kill();
-    }
-  }
-
-
-  /**
-   * Lock this on initialClasspath so that there is only one fork in the AM for
-   * getting the initial class-path. TODO: We already construct
-   * a parent CLC and use it for all the containers, so this should go away
-   * once the mr-generated-classpath stuff is gone.
-   */
-  private static String getInitialClasspath(Configuration conf) {
-    synchronized (classpathLock) {
-      if (initialClasspathFlag.get()) {
-        return initialClasspath;
-      }
-      Map<String, String> env = new HashMap<String, String>();
-
-      initialClasspath = env.get(Environment.CLASSPATH.name());
-      initialClasspathFlag.set(true);
-      return initialClasspath;
-    }
-  }
-
-  private ContainerLaunchContext createCommonContainerLaunchContext() {
-    TajoConf conf = (TajoConf) getConfig();
-
-    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-    try {
-      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the env variables to be setup
-    ////////////////////////////////////////////////////////////////////////////
-    LOG.info("Set the environment for the application master");
-
-    Map<String, String> environment = new HashMap<String, String>();
-    //String initialClassPath = getInitialClasspath(conf);
-    environment.put(Environment.SHELL.name(), "/bin/bash");
-    environment.put(Environment.JAVA_HOME.name(), System.getenv(Environment.JAVA_HOME.name()));
-
-    // TODO - to be improved with tajo.sh shell script
-    Properties prop = System.getProperties();
-    if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE")) {
-      environment.put(Environment.CLASSPATH.name(), prop.getProperty(
-          "java.class.path", null));
-    } else {
-      // Add AppMaster.jar location to classpath
-      // At some point we should not be required to add
-      // the hadoop specific classpaths to the env.
-      // It should be provided out of the box.
-      // For now setting all required classpaths including
-      // the classpath to "." for the application jar
-      StringBuilder classPathEnv = new StringBuilder("./");
-      //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
-      for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
-        classPathEnv.append(':');
-        classPathEnv.append(c.trim());
-      }
-
-      classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
-      classPathEnv.append(":./log4j.properties:./*");
-      environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
-      environment.put(
-          Environment.HADOOP_COMMON_HOME.name(),
-          System.getenv("HADOOP_HOME"));
-      environment.put(
-          Environment.HADOOP_HDFS_HOME.name(),
-          System.getenv("HADOOP_HOME"));
-      environment.put(
-          Environment.HADOOP_YARN_HOME.name(),
-          System.getenv("HADOOP_HOME"));
-      environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
-      environment.put(Environment.CLASSPATH.name(), classPathEnv.toString());
-    }
-
-    ctx.setEnvironment(environment);
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    FileSystem fs = null;
-
-
-    LOG.info("defaultFS: " + conf.get("fs.default.name"));
-    LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
-    try {
-      fs = FileSystem.get(conf);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    FileContext fsCtx = null;
-    try {
-      fsCtx = FileContext.getFileContext(getConfig());
-    } catch (UnsupportedFileSystemException e) {
-      e.printStackTrace();
-    }
-
-    LOG.info("Writing a QueryConf to HDFS and add to local environment");
-    Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
-    try {
-      writeConf(conf, queryConfPath);
-
-      LocalResource queryConfSrc = createApplicationResource(fsCtx,
-          queryConfPath, LocalResourceType.FILE);
-      localResources.put(QueryConf.FILENAME,  queryConfSrc);
-
-      ctx.setLocalResources(localResources);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    // Add shuffle token
-    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-    try {
-      //LOG.info("Putting shuffle token in serviceData");
-      serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID,
-          PullServerAuxService.serializeMetaData(0));
-    } catch (IOException ioe) {
-      LOG.error(ioe);
-    }
-    ctx.setServiceData(serviceData);
-
-    return ctx;
-  }
-
-  protected ContainerManager getCMProxy(ContainerId containerID,
-                                        final String containerManagerBindAddr,
-                                        ContainerToken containerToken)
-      throws IOException {
-    String [] hosts = containerManagerBindAddr.split(":");
-    final InetSocketAddress cmAddr =
-        new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
-    UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Token<ContainerTokenIdentifier> token =
-          ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
-      // the user in createRemoteUser in this context has to be ContainerID
-      user = UserGroupInformation.createRemoteUser(containerID.toString());
-      user.addToken(token);
-    }
-
-    ContainerManager proxy = user
-        .doAs(new PrivilegedAction<ContainerManager>() {
-          @Override
-          public ContainerManager run() {
-            return (ContainerManager) yarnRPC.getProxy(ContainerManager.class,
-                cmAddr, getConfig());
-          }
-        });
-    return proxy;
-  }
-
-  private LocalResource createApplicationResource(FileContext fs,
-                                                  Path p, LocalResourceType type)
-      throws IOException {
-    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
-    FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
-        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
-    rsrc.setSize(rsrcStat.getLen());
-    rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    return rsrc;
-  }
-
-  private void writeConf(Configuration conf, Path queryConfFile)
-      throws IOException {
-    // Write job file to Tajo's fs
-    FileSystem fs = queryConfFile.getFileSystem(conf);
-    FSDataOutputStream out =
-        FileSystem.create(fs, queryConfFile,
-            new FsPermission(QUERYCONF_FILE_PERMISSION));
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
-  private static enum ContainerState {
-    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
-  }
-
-  public class ContainerProxy {
-    private ContainerState state;
-    // store enough information to be able to cleanup the container
-    private Container container;
-    private ContainerId containerID;
-    final private String containerMgrAddress;
-    private ContainerToken containerToken;
-    private String hostName;
-    private int port = -1;
-    private final SubQueryId subQueryId;
-
-    public ContainerProxy(Container container, SubQueryId subQueryId) {
-      this.state = ContainerState.PREP;
-      this.container = container;
-      this.containerID = container.getId();
-      NodeId nodeId = container.getNodeId();
-      this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();;
-      this.containerToken = container.getContainerToken();
-      this.subQueryId = subQueryId;
-    }
-
-    public synchronized boolean isCompletelyDone() {
-      return state == ContainerState.DONE || state == ContainerState.FAILED;
-    }
-
-    @SuppressWarnings("unchecked")
-    public synchronized void launch() {
-      LOG.info("Launching Container with Id: " + containerID);
-      if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
-        state = ContainerState.DONE;
-        LOG.error("Container (" + containerID + " was killed before it was launched");
-        return;
-      }
-
-      ContainerManager proxy = null;
-      try {
-
-        proxy = getCMProxy(containerID, containerMgrAddress,
-            containerToken);
-
-        // Construct the actual Container
-        ContainerLaunchContext containerLaunchContext = createContainerLaunchContext();
-
-        // Now launch the actual container
-        StartContainerRequest startRequest = Records
-            .newRecord(StartContainerRequest.class);
-        startRequest.setContainerLaunchContext(containerLaunchContext);
-        StartContainerResponse response = proxy.startContainer(startRequest);
-
-        ByteBuffer portInfo = response
-            .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
-
-        if(portInfo != null) {
-          port = PullServerAuxService.deserializeMetaData(portInfo);
-        }
-
-        LOG.info("PullServer port returned by ContainerManager for "
-            + containerID + " : " + port);
-
-        if(port < 0) {
-          this.state = ContainerState.FAILED;
-          throw new IllegalStateException("Invalid shuffle port number "
-              + port + " returned for " + containerID);
-        }
-
-        // after launching, send launched event to task attempt to move
-        // it from ASSIGNED to RUNNING state
-//      context.getEventHandler().handle(new AMContainerEventLaunched(containerID, port));
-
-        // this is workaround code
-        context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
-
-        this.state = ContainerState.RUNNING;
-        this.hostName = containerMgrAddress.split(":")[0];
-        context.addContainer(containerID, this);
-      } catch (Throwable t) {
-        String message = "Container launch failed for " + containerID + " : "
-            + StringUtils.stringifyException(t);
-        this.state = ContainerState.FAILED;
-        LOG.error(message);
-      } finally {
-        if (proxy != null) {
-          yarnRPC.stopProxy(proxy, getConfig());
-        }
-      }
-    }
-
-    public synchronized void kill() {
-
-      if(isCompletelyDone()) {
-        return;
-      }
-      if(this.state == ContainerState.PREP) {
-        this.state = ContainerState.KILLED_BEFORE_LAUNCH;
-      } else {
-        LOG.info("KILLING " + containerID);
-
-        ContainerManager proxy = null;
-        try {
-          proxy = getCMProxy(this.containerID, this.containerMgrAddress,
-              this.containerToken);
-
-          // kill the remote container if already launched
-          StopContainerRequest stopRequest = Records
-              .newRecord(StopContainerRequest.class);
-          stopRequest.setContainerId(this.containerID);
-          proxy.stopContainer(stopRequest);
-          // If stopContainer returns without an error, assuming the stop made
-          // it over to the NodeManager.
-//          context.getEventHandler().handle(
-//              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
-          context.removeContainer(containerID);
-        } catch (Throwable t) {
-
-          // ignore the cleanup failure
-          String message = "cleanup failed for container "
-              + this.containerID + " : "
-              + StringUtils.stringifyException(t);
-//          context.getEventHandler().handle(
-//              new AMContainerEventStopFailed(containerID, message));
-          LOG.warn(message);
-          this.state = ContainerState.DONE;
-          return;
-        } finally {
-          if (proxy != null) {
-            yarnRPC.stopProxy(proxy, getConfig());
-          }
-        }
-        this.state = ContainerState.DONE;
-      }
-    }
-
-    public ContainerLaunchContext createContainerLaunchContext() {
-      synchronized (commonContainerSpecLock) {
-        if (commonContainerSpec == null) {
-          commonContainerSpec = createCommonContainerLaunchContext();
-        }
-      }
-
-      // Setup environment by cloning from common env.
-      Map<String, String> env = commonContainerSpec.getEnvironment();
-      Map<String, String> myEnv = new HashMap<String, String>(env.size());
-      myEnv.putAll(env);
-
-      // Duplicate the ByteBuffers for access by multiple containers.
-      Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-      for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
-          .getServiceData().entrySet()) {
-        myServiceData.put(entry.getKey(), entry.getValue().duplicate());
-      }
-
-      ////////////////////////////////////////////////////////////////////////////
-      // Set the local resources
-      ////////////////////////////////////////////////////////////////////////////
-      // Set the necessary command to execute the application master
-      Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
-      // Set java executable command
-      //LOG.info("Setting up app master command");
-      vargs.add("${JAVA_HOME}" + "/bin/java");
-      // Set Xmx based on am memory size
-      vargs.add("-Xmx2000m");
-      // Set Remote Debugging
-      //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
-      //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
-      //}
-      // Set class name
-      vargs.add("tajo.worker.TaskRunner");
-      vargs.add(taskListenerHost); // tasklistener hostname
-      vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
-      vargs.add(subQueryId.toString()); // subqueryId
-      vargs.add(containerMgrAddress); // nodeId
-      vargs.add(containerID.toString()); // containerId
-
-      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-      // Get final commmand
-      StringBuilder command = new StringBuilder();
-      for (CharSequence str : vargs) {
-        command.append(str).append(" ");
-      }
-
-      LOG.info("Completed setting up TaskRunner command " + command.toString());
-      List<String> commands = new ArrayList<String>();
-      commands.add(command.toString());
-
-      return BuilderUtils.newContainerLaunchContext(containerID, commonContainerSpec.getUser(),
-          container.getResource(), commonContainerSpec.getLocalResources(), myEnv, commands,
-          myServiceData, null, new HashMap<ApplicationAccessType, String>());
-    }
-
-    public String getHostName() {
-      return this.hostName;
-    }
-
-    public int getPullServerPort() {
-      return this.port;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java
deleted file mode 100644
index 661b458..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import tajo.QueryUnitAttemptId;
-import tajo.TajoIdProtos.QueryUnitAttemptIdProto;
-import tajo.conf.TajoConf;
-import tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-import tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import tajo.ipc.MasterWorkerProtocol;
-import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import tajo.master.QueryMaster.QueryContext;
-import tajo.master.event.TaskAttemptStatusUpdateEvent;
-import tajo.master.event.TaskCompletionEvent;
-import tajo.master.event.TaskFatalErrorEvent;
-import tajo.master.event.TaskRequestEvent;
-import tajo.rpc.ProtoAsyncRpcServer;
-import tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-public class TaskRunnerListener extends AbstractService
-    implements MasterWorkerProtocolService.Interface {
-  
-  private final static Log LOG = LogFactory.getLog(
-      tajo.master.cluster.WorkerListener.class);
-  private QueryContext context;
-  private ProtoAsyncRpcServer rpcServer;
-  private InetSocketAddress bindAddr;
-  private String addr;
-  
-  public TaskRunnerListener(final QueryContext context) throws Exception {
-    super(tajo.master.cluster.WorkerListener.class.getName());
-    this.context = context;
-
-
-    InetSocketAddress initIsa =
-        new InetSocketAddress(InetAddress.getLocalHost(), 0);
-    if (initIsa.getAddress() == null) {
-      throw new IllegalArgumentException("Failed resolve of " + initIsa);
-    }
-    try {
-      this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
-          this, initIsa);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    this.rpcServer.start();
-    this.bindAddr = rpcServer.getBindAddress();
-    this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    // Setup RPC server
-    try {
-      InetSocketAddress initIsa =
-          new InetSocketAddress(InetAddress.getLocalHost(), 0);
-      if (initIsa.getAddress() == null) {
-        throw new IllegalArgumentException("Failed resolve of " + initIsa);
-      }
-
-      this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
-          this, initIsa);
-
-      this.rpcServer.start();
-      this.bindAddr = rpcServer.getBindAddress();
-      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-
-    // Get the master address
-    LOG.info(tajo.master.cluster.WorkerListener.class.getSimpleName() + " is bind to " + addr);
-    context.getConf().setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-
-
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    rpcServer.shutdown();
-    super.stop();
-  }
-  
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddr;
-  }
-  
-  public String getAddress() {
-    return this.addr;
-  }
-
-  static BoolProto TRUE_PROTO = BoolProto.newBuilder().setValue(true).build();
-
-  @Override
-  public void getTask(RpcController controller, ContainerIdProto request,
-                      RpcCallback<QueryUnitRequestProto> done) {
-    context.getEventHandler().handle(new TaskRequestEvent(
-        new ContainerIdPBImpl(request), done));
-  }
-
-  @Override
-  public void statusUpdate(RpcController controller, TaskStatusProto request,
-                           RpcCallback<BoolProto> done) {
-    QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
-    context.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId,
-        request));
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void ping(RpcController controller,
-                   QueryUnitAttemptIdProto attemptIdProto,
-                   RpcCallback<BoolProto> done) {
-    // TODO - to be completed
-    QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-//    context.getQuery(attemptId.getQueryId()).getSubQuery(attemptId.getSubQueryId()).
-//        getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
-//        resetExpireTime();
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void fatalError(RpcController controller, TaskFatalErrorReport report,
-                         RpcCallback<BoolProto> done) {
-    context.getEventHandler().handle(new TaskFatalErrorEvent(report));
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void done(RpcController controller, TaskCompletionReport report,
-                       RpcCallback<BoolProto> done) {
-    context.getEventHandler().handle(new TaskCompletionEvent(report));
-    done.run(TRUE_PROTO);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java
deleted file mode 100644
index e4218f0..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import tajo.master.event.TaskSchedulerEvent;
-
-public interface TaskScheduler extends EventHandler<TaskSchedulerEvent> {
-
-}