You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:17:31 UTC

[14/16] tajo git commit: TAJO-1288: Refactoring org.apache.tajo.master package.

TAJO-1288: Refactoring org.apache.tajo.master package.

Closes #338


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1c29c1cb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1c29c1cb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1c29c1cb

Branch: refs/heads/index_support
Commit: 1c29c1cb4bd0e2d75954575717cb5cf05875fe51
Parents: a1e0328
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 00:31:54 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 00:31:54 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |    2 +
 .../apache/tajo/engine/query/QueryContext.java  |    2 +-
 .../main/java/org/apache/tajo/ha/HAService.java |   56 +
 .../org/apache/tajo/ha/HAServiceHDFSImpl.java   |  316 +++++
 .../java/org/apache/tajo/ha/TajoMasterInfo.java |   89 ++
 .../tajo/master/AbstractTaskScheduler.java      |   56 -
 .../org/apache/tajo/master/ContainerProxy.java  |    2 +-
 .../tajo/master/DefaultTaskScheduler.java       |  928 ------------
 .../apache/tajo/master/FetchScheduleEvent.java  |   40 -
 .../org/apache/tajo/master/FragmentPair.java    |   73 -
 .../org/apache/tajo/master/GlobalEngine.java    |    2 +-
 .../NonForwardQueryResultFileScanner.java       |  164 ---
 .../master/NonForwardQueryResultScanner.java    |   46 -
 .../NonForwardQueryResultSystemScanner.java     |  616 --------
 .../java/org/apache/tajo/master/QueryInfo.java  |  235 +++
 .../org/apache/tajo/master/QueryJobManager.java |  311 ++++
 .../apache/tajo/master/ScheduledFetches.java    |   49 -
 .../apache/tajo/master/TajoContainerProxy.java  |    2 +-
 .../java/org/apache/tajo/master/TajoMaster.java |   11 +-
 .../tajo/master/TajoMasterClientService.java    |   13 +-
 .../apache/tajo/master/TajoMasterService.java   |    2 -
 .../tajo/master/TaskSchedulerContext.java       |   65 -
 .../tajo/master/TaskSchedulerFactory.java       |   69 -
 .../tajo/master/event/QueryCompletedEvent.java  |    2 +-
 .../tajo/master/event/QueryStartEvent.java      |    2 +-
 .../tajo/master/event/StageCompletedEvent.java  |    2 +-
 .../event/TaskAttemptToSchedulerEvent.java      |    2 +-
 .../apache/tajo/master/exec/DDLExecutor.java    |    1 -
 .../exec/NonForwardQueryResultFileScanner.java  |  164 +++
 .../exec/NonForwardQueryResultScanner.java      |   46 +
 .../NonForwardQueryResultSystemScanner.java     |  616 ++++++++
 .../apache/tajo/master/exec/QueryExecutor.java  |    9 +-
 .../org/apache/tajo/master/ha/HAService.java    |   56 -
 .../tajo/master/ha/HAServiceHDFSImpl.java       |  318 -----
 .../apache/tajo/master/ha/TajoMasterInfo.java   |   89 --
 .../master/metrics/CatalogMetricsGaugeSet.java  |   56 -
 .../metrics/WorkerResourceMetricsGaugeSet.java  |   74 -
 .../apache/tajo/master/querymaster/Query.java   |  738 ----------
 .../master/querymaster/QueryInProgress.java     |  300 ----
 .../tajo/master/querymaster/QueryInfo.java      |  235 ---
 .../tajo/master/querymaster/QueryJobEvent.java  |   45 -
 .../master/querymaster/QueryJobManager.java     |  310 ----
 .../tajo/master/querymaster/QueryMaster.java    |  631 --------
 .../querymaster/QueryMasterManagerService.java  |  263 ----
 .../master/querymaster/QueryMasterRunner.java   |  149 --
 .../master/querymaster/QueryMasterTask.java     |  638 ---------
 .../tajo/master/querymaster/Repartitioner.java  | 1251 ----------------
 .../apache/tajo/master/querymaster/Stage.java   | 1342 ------------------
 .../tajo/master/querymaster/StageState.java     |   30 -
 .../apache/tajo/master/querymaster/Task.java    |  907 ------------
 .../tajo/master/querymaster/TaskAttempt.java    |  443 ------
 .../master/rm/TajoWorkerResourceManager.java    |    3 +-
 .../tajo/master/rm/WorkerResourceManager.java   |    2 +-
 .../master/scheduler/QuerySchedulingInfo.java   |   55 +
 .../apache/tajo/master/scheduler/Scheduler.java |   41 +
 .../master/scheduler/SchedulingAlgorithms.java  |   47 +
 .../master/scheduler/SimpleFifoScheduler.java   |  147 ++
 .../master/session/InvalidSessionException.java |   25 -
 .../session/NoSuchSessionVariableException.java |   25 -
 .../org/apache/tajo/master/session/Session.java |  196 ---
 .../tajo/master/session/SessionConstants.java   |   23 -
 .../tajo/master/session/SessionEvent.java       |   34 -
 .../tajo/master/session/SessionEventType.java   |   24 -
 .../session/SessionLivelinessMonitor.java       |   53 -
 .../tajo/master/session/SessionManager.java     |  144 --
 .../tajo/metrics/CatalogMetricsGaugeSet.java    |   56 +
 .../metrics/WorkerResourceMetricsGaugeSet.java  |   74 +
 .../tajo/querymaster/AbstractTaskScheduler.java |   56 +
 .../tajo/querymaster/DefaultTaskScheduler.java  |  926 ++++++++++++
 .../tajo/querymaster/FetchScheduleEvent.java    |   40 +
 .../java/org/apache/tajo/querymaster/Query.java |  738 ++++++++++
 .../tajo/querymaster/QueryInProgress.java       |  301 ++++
 .../apache/tajo/querymaster/QueryJobEvent.java  |   46 +
 .../apache/tajo/querymaster/QueryMaster.java    |  631 ++++++++
 .../querymaster/QueryMasterManagerService.java  |  262 ++++
 .../tajo/querymaster/QueryMasterTask.java       |  638 +++++++++
 .../apache/tajo/querymaster/Repartitioner.java  | 1250 ++++++++++++++++
 .../java/org/apache/tajo/querymaster/Stage.java | 1342 ++++++++++++++++++
 .../org/apache/tajo/querymaster/StageState.java |   30 +
 .../java/org/apache/tajo/querymaster/Task.java  |  897 ++++++++++++
 .../apache/tajo/querymaster/TaskAttempt.java    |  443 ++++++
 .../tajo/querymaster/TaskSchedulerContext.java  |   65 +
 .../tajo/querymaster/TaskSchedulerFactory.java  |   68 +
 .../tajo/scheduler/QuerySchedulingInfo.java     |   55 -
 .../org/apache/tajo/scheduler/Scheduler.java    |   41 -
 .../tajo/scheduler/SchedulingAlgorithms.java    |   47 -
 .../tajo/scheduler/SimpleFifoScheduler.java     |  147 --
 .../tajo/session/InvalidSessionException.java   |   25 +
 .../session/NoSuchSessionVariableException.java |   25 +
 .../java/org/apache/tajo/session/Session.java   |  196 +++
 .../apache/tajo/session/SessionConstants.java   |   23 +
 .../org/apache/tajo/session/SessionEvent.java   |   34 +
 .../apache/tajo/session/SessionEventType.java   |   24 +
 .../tajo/session/SessionLivelinessMonitor.java  |   53 +
 .../org/apache/tajo/session/SessionManager.java |  144 ++
 .../main/java/org/apache/tajo/util/JSPUtil.java |   10 +-
 .../apache/tajo/util/history/HistoryReader.java |    2 +-
 .../apache/tajo/util/history/HistoryWriter.java |    2 +-
 .../java/org/apache/tajo/worker/FetchImpl.java  |    4 +-
 .../tajo/worker/TajoResourceAllocator.java      |    6 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |    6 +-
 .../tajo/worker/TajoWorkerClientService.java    |    2 +-
 tajo-core/src/main/resources/tajo-default.xml   |    2 +-
 .../resources/webapps/admin/catalogview.jsp     |    2 +-
 .../main/resources/webapps/admin/cluster.jsp    |    4 +-
 .../src/main/resources/webapps/admin/index.jsp  |    6 +-
 .../src/main/resources/webapps/admin/query.jsp  |    4 +-
 .../resources/webapps/admin/query_executor.jsp  |    2 +-
 .../src/main/resources/webapps/worker/index.jsp |    4 +-
 .../resources/webapps/worker/querydetail.jsp    |    4 +-
 .../main/resources/webapps/worker/queryplan.jsp |    6 +-
 .../resources/webapps/worker/querytasks.jsp     |    2 +-
 .../src/main/resources/webapps/worker/task.jsp  |    8 +-
 .../apache/tajo/LocalTajoTestingUtility.java    |    2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |    8 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |    2 +-
 .../planner/physical/TestPhysicalPlanner.java   |    2 +-
 .../tajo/engine/query/TestGroupByQuery.java     |    8 +-
 .../tajo/engine/query/TestJoinBroadcast.java    |    2 +-
 .../tajo/engine/query/TestTablePartitions.java  |    2 +-
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |  153 ++
 .../TestNonForwardQueryResultSystemScanner.java |    4 +-
 .../apache/tajo/master/TestRepartitioner.java   |    8 +-
 .../tajo/master/ha/TestHAServiceHDFSImpl.java   |  158 ---
 .../querymaster/TestIntermediateEntry.java      |   53 -
 .../tajo/master/querymaster/TestKillQuery.java  |  125 --
 .../master/querymaster/TestQueryProgress.java   |   75 -
 .../querymaster/TestTaskStatusUpdate.java       |  194 ---
 .../master/scheduler/TestFifoScheduler.java     |  116 ++
 .../tajo/querymaster/TestIntermediateEntry.java |   53 +
 .../apache/tajo/querymaster/TestKillQuery.java  |  125 ++
 .../tajo/querymaster/TestQueryProgress.java     |   75 +
 .../tajo/querymaster/TestTaskStatusUpdate.java  |  194 +++
 .../tajo/scheduler/TestFifoScheduler.java       |  116 --
 .../java/org/apache/tajo/util/TestJSPUtil.java  |    2 +-
 .../util/history/TestHistoryWriterReader.java   |    2 +-
 .../org/apache/tajo/worker/TestHistory.java     |    2 +-
 tajo-dist/pom.xml                               |    8 +-
 138 files changed, 11317 insertions(+), 11612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7783db8..96b63ea 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1288: Refactoring org.apache.tajo.master package. (hyunsik)
+
     TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. 
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 493ca6e..7b3c00d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -25,7 +25,7 @@ import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.logical.NodeType;
 
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
new file mode 100644
index 0000000..1329223
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The HAService is responsible for setting active TajoMaster on startup or when the
+ * current active is changing (eg due to failure), monitoring the health of TajoMaster.
+ *
+ */
+public interface HAService {
+
+  /**
+   * Add master name to shared storage.
+   */
+  public void register() throws IOException;
+
+
+  /**
+   * Delete master name to shared storage.
+   *
+   */
+  public void delete() throws IOException;
+
+  /**
+   *
+   * @return True if current master is an active master.
+   */
+  public boolean isActiveStatus();
+
+  /**
+   *
+   * @return return all master list
+   * @throws IOException
+   */
+  public List<TajoMasterInfo> getMasters() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
new file mode 100644
index 0000000..e18a9b2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
+ *
+ */
+public class HAServiceHDFSImpl implements HAService {
+  private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
+
+  private MasterContext context;
+  private TajoConf conf;
+
+  private FileSystem fs;
+
+  private String masterName;
+  private Path rootPath;
+  private Path haPath;
+  private Path activePath;
+  private Path backupPath;
+
+  private boolean isActiveStatus = false;
+
+  //thread which runs periodically to see the last time since a heartbeat is received.
+  private Thread checkerThread;
+  private volatile boolean stopped = false;
+
+  private int monitorInterval;
+
+  private String currentActiveMaster;
+
+  public HAServiceHDFSImpl(MasterContext context) throws IOException {
+    this.context = context;
+    this.conf = context.getConf();
+    initSystemDirectory();
+
+    InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
+    this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
+
+    monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
+  }
+
+  private void initSystemDirectory() throws IOException {
+    // Get Tajo root dir
+    this.rootPath = TajoConf.getTajoRootDir(conf);
+
+    // Check Tajo root dir
+    this.fs = rootPath.getFileSystem(conf);
+
+    // Check and create Tajo system HA dir
+    haPath = TajoConf.getSystemHADir(conf);
+    if (!fs.exists(haPath)) {
+      fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA dir '" + haPath + "' is created");
+    }
+
+    activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+    if (!fs.exists(activePath)) {
+      fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA Active dir '" + activePath + "' is created");
+    }
+
+    backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+    if (!fs.exists(backupPath)) {
+      fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System HA Backup dir '" + backupPath + "' is created");
+    }
+  }
+
+  private void startPingChecker() {
+    if (checkerThread == null) {
+      checkerThread = new Thread(new PingChecker());
+      checkerThread.setName("Ping Checker");
+      checkerThread.start();
+    }
+  }
+
+  @Override
+  public void register() throws IOException {
+    FileStatus[] files = fs.listStatus(activePath);
+
+    // Phase 1: If there is not another active master, this try to become active master.
+    if (files.length == 0) {
+      createMasterFile(true);
+      currentActiveMaster = masterName;
+      LOG.info(String.format("This is added to active master (%s)", masterName));
+    } else {
+      // Phase 2: If there is active master information, we need to check its status.
+      Path activePath = files[0].getPath();
+      currentActiveMaster = activePath.getName().replaceAll("_", ":");
+
+      // Phase 3: If current active master is dead, this master should be active master.
+      if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
+        fs.delete(activePath, true);
+        createMasterFile(true);
+        currentActiveMaster = masterName;
+        LOG.info(String.format("This is added to active master (%s)", masterName));
+      } else {
+        // Phase 4: If current active master is alive, this master need to be backup master.
+        createMasterFile(false);
+        LOG.info(String.format("This is added to backup masters (%s)", masterName));
+      }
+    }
+  }
+
+  private void createMasterFile(boolean isActive) throws IOException {
+    String fileName = masterName.replaceAll(":", "_");
+    Path path = null;
+
+    if (isActive) {
+      path = new Path(activePath, fileName);
+    } else {
+      path = new Path(backupPath, fileName);
+    }
+
+    StringBuilder sb = new StringBuilder();
+    InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+    address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
+
+    FSDataOutputStream out = fs.create(path);
+
+    try {
+      out.writeUTF(sb.toString());
+      out.hflush();
+      out.close();
+    } catch (FileAlreadyExistsException e) {
+      createMasterFile(false);
+    }
+
+    if (isActive) {
+      isActiveStatus = true;
+    } else {
+      isActiveStatus = false;
+    }
+
+    startPingChecker();
+  }
+
+
+  private InetSocketAddress getHostAddress(int type) {
+    InetSocketAddress address = null;
+
+    switch (type) {
+      case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+        break;
+      case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .TAJO_MASTER_CLIENT_RPC_ADDRESS);
+        break;
+      case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .RESOURCE_TRACKER_RPC_ADDRESS);
+        break;
+      case HAConstants.CATALOG_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+          .CATALOG_ADDRESS);
+        break;
+      case HAConstants.MASTER_INFO_ADDRESS:
+        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+        .TAJO_MASTER_INFO_ADDRESS);
+      default:
+        break;
+    }
+
+    return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
+  }
+
+  @Override
+  public void delete() throws IOException {
+    String fileName = masterName.replaceAll(":", "_");
+
+    Path activeFile = new Path(activePath, fileName);
+    if (fs.exists(activeFile)) {
+      fs.delete(activeFile, true);
+    }
+
+    Path backupFile = new Path(backupPath, fileName);
+    if (fs.exists(backupFile)) {
+      fs.delete(backupFile, true);
+    }
+    if (isActiveStatus) {
+      isActiveStatus = false;
+    }
+    stopped = true;
+  }
+
+  @Override
+  public boolean isActiveStatus() {
+    return isActiveStatus;
+  }
+
+  @Override
+  public List<TajoMasterInfo> getMasters() throws IOException {
+    List<TajoMasterInfo> list = TUtil.newList();
+    Path path = null;
+
+    FileStatus[] files = fs.listStatus(activePath);
+    if (files.length == 1) {
+      path = files[0].getPath();
+      list.add(createTajoMasterInfo(path, true));
+    }
+
+    files = fs.listStatus(backupPath);
+    for (FileStatus status : files) {
+      path = status.getPath();
+      list.add(createTajoMasterInfo(path, false));
+    }
+
+    return list;
+  }
+
+  private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
+    String masterAddress = path.getName().replaceAll("_", ":");
+    boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
+
+    FSDataInputStream stream = fs.open(path);
+    String data = stream.readUTF();
+
+    stream.close();
+
+    String[] addresses = data.split("_");
+    TajoMasterInfo info = new TajoMasterInfo();
+
+    info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
+    info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
+    info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
+    info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
+    info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
+
+    info.setAvailable(isAlive);
+    info.setActive(isActive);
+
+    return info;
+  }
+
+  private class PingChecker implements Runnable {
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        synchronized (HAServiceHDFSImpl.this) {
+          try {
+            if (!currentActiveMaster.equals(masterName)) {
+              boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
+                  + ", isAlive:" + isAlive);
+              }
+
+              // If active master is dead, this master should be active master instead of
+              // previous active master.
+              if (!isAlive) {
+                FileStatus[] files = fs.listStatus(activePath);
+                if (files.length == 0 || (files.length ==  1
+                  && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
+                  delete();
+                  register();
+                }
+              }
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+        try {
+          Thread.sleep(monitorInterval);
+        } catch (InterruptedException e) {
+          LOG.info("PingChecker interrupted. - masterName:" + masterName);
+          break;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
new file mode 100644
index 0000000..c6fdd40
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import java.net.InetSocketAddress;
+
+public class TajoMasterInfo {
+
+  private boolean available;
+  private boolean isActive;
+
+  private InetSocketAddress tajoMasterAddress;
+  private InetSocketAddress tajoClientAddress;
+  private InetSocketAddress workerResourceTrackerAddr;
+  private InetSocketAddress catalogAddress;
+  private InetSocketAddress webServerAddress;
+
+  public InetSocketAddress getTajoMasterAddress() {
+    return tajoMasterAddress;
+  }
+
+  public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
+    this.tajoMasterAddress = tajoMasterAddress;
+  }
+
+  public InetSocketAddress getTajoClientAddress() {
+    return tajoClientAddress;
+  }
+
+  public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
+    this.tajoClientAddress = tajoClientAddress;
+  }
+
+  public InetSocketAddress getWorkerResourceTrackerAddr() {
+    return workerResourceTrackerAddr;
+  }
+
+  public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
+    this.workerResourceTrackerAddr = workerResourceTrackerAddr;
+  }
+
+  public InetSocketAddress getCatalogAddress() {
+    return catalogAddress;
+  }
+
+  public void setCatalogAddress(InetSocketAddress catalogAddress) {
+    this.catalogAddress = catalogAddress;
+  }
+
+  public InetSocketAddress getWebServerAddress() {
+    return webServerAddress;
+  }
+
+  public void setWebServerAddress(InetSocketAddress webServerAddress) {
+    this.webServerAddress = webServerAddress;
+  }
+
+  public boolean isAvailable() {
+    return available;
+  }
+
+  public void setAvailable(boolean available) {
+    this.available = available;
+  }
+
+  public boolean isActive() {
+    return isActive;
+  }
+
+  public void setActive(boolean active) {
+    isActive = active;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
deleted file mode 100644
index 320a5aa..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
-
-
-public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
-
-  protected int hostLocalAssigned;
-  protected int rackLocalAssigned;
-  protected int totalAssigned;
-
-  /**
-   * Construct the service.
-   *
-   * @param name service name
-   */
-  public AbstractTaskScheduler(String name) {
-    super(name);
-  }
-
-  public int getHostLocalAssigned() {
-    return hostLocalAssigned;
-  }
-
-  public int getRackLocalAssigned() {
-    return rackLocalAssigned;
-  }
-
-  public int getTotalAssigned() {
-    return totalAssigned;
-  }
-
-  public abstract void handleTaskRequestEvent(TaskRequestEvent event);
-  public abstract int remainingScheduledObjectNum();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
index 462de91..562790d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.master.container.TajoContainer;
 import org.apache.tajo.master.container.TajoContainerId;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
deleted file mode 100644
index d47c93a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ /dev/null
@@ -1,928 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.plan.serder.LogicalNodeSerializer;
-import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.storage.DataLocation;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class DefaultTaskScheduler extends AbstractTaskScheduler {
-  private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
-
-  private final TaskSchedulerContext context;
-  private Stage stage;
-
-  private Thread schedulingThread;
-  private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
-
-  private ScheduledRequests scheduledRequests;
-  private TaskRequests taskRequests;
-
-  private int nextTaskId = 0;
-  private int scheduledObjectNum = 0;
-
-  public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) {
-    super(DefaultTaskScheduler.class.getName());
-    this.context = context;
-    this.stage = stage;
-  }
-
-  @Override
-  public void init(Configuration conf) {
-
-    scheduledRequests = new ScheduledRequests();
-    taskRequests  = new TaskRequests();
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    LOG.info("Start TaskScheduler");
-
-    this.schedulingThread = new Thread() {
-      public void run() {
-
-        while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
-          try {
-            synchronized (schedulingThread){
-              schedulingThread.wait(100);
-            }
-            schedule();
-          } catch (InterruptedException e) {
-            break;
-          } catch (Throwable e) {
-            LOG.fatal(e.getMessage(), e);
-            break;
-          }
-        }
-        LOG.info("TaskScheduler schedulingThread stopped");
-      }
-    };
-
-    this.schedulingThread.start();
-    super.start();
-  }
-
-  private static final TaskAttemptId NULL_ATTEMPT_ID;
-  public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
-  static {
-    ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
-
-    TajoWorkerProtocol.TaskRequestProto.Builder builder =
-        TajoWorkerProtocol.TaskRequestProto.newBuilder();
-    builder.setId(NULL_ATTEMPT_ID.getProto());
-    builder.setShouldDie(true);
-    builder.setOutputTable("");
-    builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
-    builder.setClusteredOutput(false);
-    stopTaskRunnerReq = builder.build();
-  }
-
-  @Override
-  public void stop() {
-    if(stopEventHandling.getAndSet(true)){
-      return;
-    }
-
-    if (schedulingThread != null) {
-      synchronized (schedulingThread) {
-        schedulingThread.notifyAll();
-      }
-    }
-
-    // Return all of request callbacks instantly.
-    if(taskRequests != null){
-      for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
-        req.getCallback().run(stopTaskRunnerReq);
-      }
-    }
-
-    LOG.info("Task Scheduler stopped");
-    super.stop();
-  }
-
-  private Fragment[] fragmentsForNonLeafTask;
-  private Fragment[] broadcastFragmentsForNonLeafTask;
-
-  LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
-  public void schedule() {
-
-    if (taskRequests.size() > 0) {
-      if (scheduledRequests.leafTaskNum() > 0) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", LeafTask Schedule Request: " +
-            scheduledRequests.leafTaskNum());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledRequests.leafTaskNum());
-        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
-        if (taskRequestEvents.size() > 0) {
-          scheduledRequests.assignToLeafTasks(taskRequestEvents);
-          taskRequestEvents.clear();
-        }
-      }
-    }
-
-    if (taskRequests.size() > 0) {
-      if (scheduledRequests.nonLeafTaskNum() > 0) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", NonLeafTask Schedule Request: " +
-            scheduledRequests.nonLeafTaskNum());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledRequests.nonLeafTaskNum());
-        scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
-        taskRequestEvents.clear();
-      }
-    }
-  }
-
-  @Override
-  public void handle(TaskSchedulerEvent event) {
-    if (event.getType() == EventType.T_SCHEDULE) {
-      if (event instanceof FragmentScheduleEvent) {
-        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
-        if (context.isLeafQuery()) {
-          TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext();
-          Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
-          task.addFragment(castEvent.getLeftFragment(), true);
-          scheduledObjectNum++;
-          if (castEvent.hasRightFragments()) {
-            task.addFragments(castEvent.getRightFragments());
-          }
-          stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
-        } else {
-          fragmentsForNonLeafTask = new FileFragment[2];
-          fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
-          if (castEvent.hasRightFragments()) {
-            FileFragment[] rightFragments = castEvent.getRightFragments().toArray(new FileFragment[]{});
-            fragmentsForNonLeafTask[1] = rightFragments[0];
-            if (rightFragments.length > 1) {
-              broadcastFragmentsForNonLeafTask = new FileFragment[rightFragments.length - 1];
-              System.arraycopy(rightFragments, 1, broadcastFragmentsForNonLeafTask, 0, broadcastFragmentsForNonLeafTask.length);
-            } else {
-              broadcastFragmentsForNonLeafTask = null;
-            }
-          }
-        }
-      } else if (event instanceof FetchScheduleEvent) {
-        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
-        Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
-        TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
-        Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
-        scheduledObjectNum++;
-        for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
-          task.addFetches(eachFetch.getKey(), eachFetch.getValue());
-          task.addFragment(fragmentsForNonLeafTask[0], true);
-          if (fragmentsForNonLeafTask[1] != null) {
-            task.addFragment(fragmentsForNonLeafTask[1], true);
-          }
-        }
-        if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) {
-          task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask));
-        }
-        stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
-      } else if (event instanceof TaskAttemptToSchedulerEvent) {
-        TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
-        if (context.isLeafQuery()) {
-          scheduledRequests.addLeafTask(castEvent);
-        } else {
-          scheduledRequests.addNonLeafTask(castEvent);
-        }
-      }
-    } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
-      // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler.
-      // This event is triggered by TaskAttempt.
-      TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event;
-      scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId());
-      LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
-      ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle(
-          new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
-    }
-  }
-
-  @Override
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-
-    taskRequests.handle(event);
-    int hosts = scheduledRequests.leafTaskHostMapping.size();
-
-    // if available cluster resource are large then tasks, the scheduler thread are working immediately.
-    if(remainingScheduledObjectNum() > 0 &&
-        (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
-      synchronized (schedulingThread){
-        schedulingThread.notifyAll();
-      }
-    }
-  }
-
-  @Override
-  public int remainingScheduledObjectNum() {
-    return scheduledObjectNum;
-  }
-
-  private class TaskRequests implements EventHandler<TaskRequestEvent> {
-    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
-        new LinkedBlockingQueue<TaskRequestEvent>();
-
-    @Override
-    public void handle(TaskRequestEvent event) {
-      if(LOG.isDebugEnabled()){
-        LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
-      }
-
-      if(stopEventHandling.get()) {
-        event.getCallback().run(stopTaskRunnerReq);
-        return;
-      }
-      int qSize = taskRequestQueue.size();
-      if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
-      }
-      int remCapacity = taskRequestQueue.remainingCapacity();
-      if (remCapacity < 1000) {
-        LOG.warn("Very low remaining capacity in the event-queue "
-            + "of DefaultTaskScheduler: " + remCapacity);
-      }
-
-      taskRequestQueue.add(event);
-    }
-
-    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
-                                int num) {
-      taskRequestQueue.drainTo(taskRequests, num);
-    }
-
-    public int size() {
-      return taskRequestQueue.size();
-    }
-  }
-
-  /**
-   * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
-   * describes various information for one worker, including :
-   * <ul>
-   *  <li>host name</li>
-   *  <li>rack name</li>
-   *  <li>unassigned tasks for each disk volume</li>
-   *  <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
-   *  <li>the number of running tasks for each volume</li>
-   * </ul>, each task runner and the concurrency number of running tasks for volumes.
-   *
-   * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
-   * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
-   * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
-   *
-   * <h3>Volume id</h3>
-   * Volume id is an integer. Each volume id identifies each disk volume.
-   *
-   * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}.   *
-   * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
-   * In this case, the volume id will be -1 or other native integer.
-   *
-   * <h3>See Also</h3>
-   * <ul>
-   *   <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
-   * </ul>
-   */
-  public class HostVolumeMapping {
-    private final String host;
-    private final String rack;
-    /** A key is disk volume, and a value is a list of tasks to be scheduled. */
-    private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume =
-        Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>());
-    /** A value is last assigned volume id for each task runner */
-    private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId,
-      Integer>();
-    /**
-     * A key is disk volume id, and a value is the load of this volume.
-     * This load is measured by counting how many number of tasks are running.
-     *
-     * These disk volumes are kept in an order of ascending order of the volume id.
-     * In other words, the head volume ids are likely to -1, meaning no given volume id.
-     */
-    private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
-    /** The total number of remain tasks in this host */
-    private AtomicInteger remainTasksNum = new AtomicInteger(0);
-    public static final int REMOTE = -2;
-
-
-    public HostVolumeMapping(String host, String rack){
-      this.host = host;
-      this.rack = rack;
-    }
-
-    public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){
-      synchronized (unassignedTaskForEachVolume){
-        LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
-        if (list == null) {
-          list = new LinkedHashSet<TaskAttempt>();
-          unassignedTaskForEachVolume.put(volumeId, list);
-        }
-        list.add(attemptId);
-      }
-
-      remainTasksNum.incrementAndGet();
-
-      if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
-    }
-
-    /**
-     *  Priorities
-     *  1. a task list in a volume of host
-     *  2. unknown block or Non-splittable task in host
-     *  3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
-     */
-    public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) {
-      int volumeId;
-      TaskAttemptId taskAttemptId = null;
-
-      if (!lastAssignedVolumeId.containsKey(containerId)) {
-        volumeId = getLowestVolumeId();
-        increaseConcurrency(containerId, volumeId);
-      } else {
-        volumeId = lastAssignedVolumeId.get(containerId);
-      }
-
-      if (unassignedTaskForEachVolume.size() >  0) {
-        int retry = unassignedTaskForEachVolume.size();
-        do {
-          //clean and get a remaining local task
-          taskAttemptId = getAndRemove(volumeId);
-          if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
-            decreaseConcurrency(containerId);
-            if (volumeId > REMOTE) {
-              diskVolumeLoads.remove(volumeId);
-            }
-          }
-
-          if (taskAttemptId == null) {
-            //reassign next volume
-            volumeId = getLowestVolumeId();
-            increaseConcurrency(containerId, volumeId);
-            retry--;
-          } else {
-            break;
-          }
-        } while (retry > 0);
-      } else {
-        this.remainTasksNum.set(0);
-      }
-      return taskAttemptId;
-    }
-
-    public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) {
-      TaskAttemptId taskAttemptId = null;
-
-      if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
-        int retry = unassignedTaskForEachVolume.size();
-        do {
-          //clean and get a remaining task
-          int volumeId = getLowestVolumeId();
-          taskAttemptId = getAndRemove(volumeId);
-          if (taskAttemptId == null) {
-            if (volumeId > REMOTE) {
-              diskVolumeLoads.remove(volumeId);
-            }
-            retry--;
-          } else {
-            break;
-          }
-        } while (retry > 0);
-      }
-      return taskAttemptId;
-    }
-
-    private synchronized TaskAttemptId getAndRemove(int volumeId){
-      TaskAttemptId taskAttemptId = null;
-      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId;
-
-      LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
-      if(list != null && list.size() > 0){
-        TaskAttempt taskAttempt;
-        synchronized (unassignedTaskForEachVolume) {
-          Iterator<TaskAttempt> iterator = list.iterator();
-          taskAttempt = iterator.next();
-          iterator.remove();
-        }
-
-        this.remainTasksNum.getAndDecrement();
-        taskAttemptId = taskAttempt.getId();
-        for (DataLocation location : taskAttempt.getTask().getDataLocations()) {
-          if (!this.getHost().equals(location.getHost())) {
-            HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
-            if (volumeMapping != null) {
-              volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt);
-            }
-          }
-        }
-      }
-
-      if(list == null || list.isEmpty()) {
-        unassignedTaskForEachVolume.remove(volumeId);
-      }
-      return taskAttemptId;
-    }
-
-    private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){
-      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
-
-      LinkedHashSet<TaskAttempt> tasks  = unassignedTaskForEachVolume.get(volumeId);
-
-      if(tasks != null && tasks.size() > 0){
-        tasks.remove(taskAttempt);
-        remainTasksNum.getAndDecrement();
-      } else {
-        unassignedTaskForEachVolume.remove(volumeId);
-      }
-    }
-
-    /**
-     * Increase the count of running tasks and disk loads for a certain task runner.
-     *
-     * @param containerId The task runner identifier
-     * @param volumeId Volume identifier
-     * @return the volume load (i.e., how many running tasks use this volume)
-     */
-    private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) {
-
-      int concurrency = 1;
-      if (diskVolumeLoads.containsKey(volumeId)) {
-        concurrency = diskVolumeLoads.get(volumeId) + 1;
-      }
-
-      if (volumeId > -1) {
-        LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
-      } else if (volumeId == -1) {
-        // this case is disabled namenode block meta or compressed text file or amazon s3
-        LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
-      } else if (volumeId == REMOTE) {
-        // this case has processed all block on host and it will be assigned to remote
-        LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
-            + ", Remote Concurrency : " + concurrency);
-      }
-      diskVolumeLoads.put(volumeId, concurrency);
-      lastAssignedVolumeId.put(containerId, volumeId);
-      return concurrency;
-    }
-
-    /**
-     * Decrease the count of running tasks of a certain task runner
-     */
-    private synchronized void decreaseConcurrency(TajoContainerId containerId){
-      Integer volumeId = lastAssignedVolumeId.get(containerId);
-      if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
-        Integer concurrency = diskVolumeLoads.get(volumeId);
-        if(concurrency > 0){
-          diskVolumeLoads.put(volumeId, concurrency - 1);
-        } else {
-          if (volumeId > REMOTE) {
-            diskVolumeLoads.remove(volumeId);
-          }
-        }
-      }
-      lastAssignedVolumeId.remove(containerId);
-    }
-
-    /**
-     *  volume of a host : 0 ~ n
-     *  compressed task, amazon s3, unKnown volume : -1
-     *  remote task : -2
-     */
-    public int getLowestVolumeId(){
-      Map.Entry<Integer, Integer> volumeEntry = null;
-
-      for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
-        if(volumeEntry == null) volumeEntry = entry;
-
-        if (volumeEntry.getValue() >= entry.getValue()) {
-          volumeEntry = entry;
-        }
-      }
-
-      if(volumeEntry != null){
-        return volumeEntry.getKey();
-      } else {
-        return REMOTE;
-      }
-    }
-
-    public boolean isAssigned(TajoContainerId containerId){
-      return lastAssignedVolumeId.containsKey(containerId);
-    }
-
-    public boolean isRemote(TajoContainerId containerId){
-      Integer volumeId = lastAssignedVolumeId.get(containerId);
-      if(volumeId == null || volumeId > REMOTE){
-        return false;
-      } else {
-        return true;
-      }
-    }
-
-    public int getRemoteConcurrency(){
-      return getVolumeConcurrency(REMOTE);
-    }
-
-    public int getVolumeConcurrency(int volumeId){
-      Integer size = diskVolumeLoads.get(volumeId);
-      if(size == null) return 0;
-      else return size;
-    }
-
-    public int getRemainingLocalTaskSize(){
-      return remainTasksNum.get();
-    }
-
-    public String getHost() {
-
-      return host;
-    }
-
-    public String getRack() {
-      return rack;
-    }
-  }
-
-  private class ScheduledRequests {
-    // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
-    // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
-    // if the task is not included in leafTasks and nonLeafTasks.
-    private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
-    private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
-    private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap();
-    private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
-
-    private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) {
-      TaskAttempt taskAttempt = event.getTaskAttempt();
-      List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
-
-      for (DataLocation location : locations) {
-        String host = location.getHost();
-
-        HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
-        if (hostVolumeMapping == null) {
-          String rack = RackResolver.resolve(host).getNetworkLocation();
-          hostVolumeMapping = new HostVolumeMapping(host, rack);
-          leafTaskHostMapping.put(host, hostVolumeMapping);
-        }
-        hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Added attempt req to host " + host);
-        }
-
-        HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
-        if (list == null) {
-          list = new HashSet<TaskAttemptId>();
-          leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
-        }
-
-        list.add(taskAttempt.getId());
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
-        }
-      }
-
-      leafTasks.add(taskAttempt.getId());
-    }
-
-    private void addNonLeafTask(TaskAttemptToSchedulerEvent event) {
-      nonLeafTasks.add(event.getTaskAttempt().getId());
-    }
-
-    public int leafTaskNum() {
-      return leafTasks.size();
-    }
-
-    public int nonLeafTaskNum() {
-      return nonLeafTasks.size();
-    }
-
-    public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>();
-
-    private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){
-      HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
-
-      if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
-        for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) {
-          TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
-
-          if(attemptId == null) break;
-          //find remaining local task
-          if (leafTasks.contains(attemptId)) {
-            leafTasks.remove(attemptId);
-            //LOG.info(attemptId + " Assigned based on host match " + hostName);
-            hostLocalAssigned++;
-            totalAssigned++;
-            return attemptId;
-          }
-        }
-      }
-      return null;
-    }
-
-    private TaskAttemptId allocateRackTask(String host) {
-
-      List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values());
-      String rack = RackResolver.resolve(host).getNetworkLocation();
-      TaskAttemptId attemptId = null;
-
-      if (remainingTasks.size() > 0) {
-        synchronized (scheduledRequests) {
-          //find largest remaining task of other host in rack
-          Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
-            @Override
-            public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
-              // descending remaining tasks
-              if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) {
-                return 1;
-              } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) {
-                return 0;
-              } else {
-                return -1;
-              }
-            }
-          });
-        }
-
-        for (HostVolumeMapping tasks : remainingTasks) {
-          for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) {
-            TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack);
-
-            if (tId == null) break;
-
-            if (leafTasks.contains(tId)) {
-              leafTasks.remove(tId);
-              attemptId = tId;
-              break;
-            }
-          }
-          if(attemptId != null) break;
-        }
-      }
-
-      //find task in rack
-      if (attemptId == null) {
-        HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack);
-        if (list != null) {
-          synchronized (list) {
-            Iterator<TaskAttemptId> iterator = list.iterator();
-            while (iterator.hasNext()) {
-              TaskAttemptId tId = iterator.next();
-              iterator.remove();
-              if (leafTasks.contains(tId)) {
-                leafTasks.remove(tId);
-                attemptId = tId;
-                break;
-              }
-            }
-          }
-        }
-      }
-
-      if (attemptId != null) {
-        rackLocalAssigned++;
-        totalAssigned++;
-
-        LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
-            hostLocalAssigned, rackLocalAssigned, totalAssigned,
-            ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
-
-      }
-      return attemptId;
-    }
-
-    public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
-      Collections.shuffle(taskRequests);
-      LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
-
-      TaskRequestEvent taskRequest;
-      while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
-        taskRequest = taskRequests.pollFirst();
-        if(taskRequest == null) { // if there are only remote task requests
-          taskRequest = remoteTaskRequests.pollFirst();
-        }
-
-        // checking if this container is still alive.
-        // If not, ignore the task request and stop the task runner
-        ContainerProxy container = context.getMasterContext().getResourceAllocator()
-            .getContainer(taskRequest.getContainerId());
-        if(container == null) {
-          taskRequest.getCallback().run(stopTaskRunnerReq);
-          continue;
-        }
-
-        // getting the hostname of requested node
-        WorkerConnectionInfo connectionInfo =
-            context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
-        String host = connectionInfo.getHost();
-
-        // if there are no worker matched to the hostname a task request
-        if(!leafTaskHostMapping.containsKey(host)){
-          String normalizedHost = NetUtils.normalizeHost(host);
-
-          if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){
-            // this case means one of either cases:
-            // * there are no blocks which reside in this node.
-            // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
-            // In this case, we transfer the task request to the remote task request list, and skip the followings.
-            remoteTaskRequests.add(taskRequest);
-            continue;
-          }
-        }
-
-        TajoContainerId containerId = taskRequest.getContainerId();
-        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
-            "containerId=" + containerId);
-
-        //////////////////////////////////////////////////////////////////////
-        // disk or host-local allocation
-        //////////////////////////////////////////////////////////////////////
-        TaskAttemptId attemptId = allocateLocalTask(host, containerId);
-
-        if (attemptId == null) { // if a local task cannot be found
-          HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
-
-          if(hostVolumeMapping != null) {
-            if(!hostVolumeMapping.isRemote(containerId)){
-              // assign to remote volume
-              hostVolumeMapping.decreaseConcurrency(containerId);
-              hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
-            }
-            // this part is remote concurrency management of a tail tasks
-            int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
-
-            if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
-              //release container
-              hostVolumeMapping.decreaseConcurrency(containerId);
-              taskRequest.getCallback().run(stopTaskRunnerReq);
-              continue;
-            }
-          }
-
-          //////////////////////////////////////////////////////////////////////
-          // rack-local allocation
-          //////////////////////////////////////////////////////////////////////
-          attemptId = allocateRackTask(host);
-
-          //////////////////////////////////////////////////////////////////////
-          // random node allocation
-          //////////////////////////////////////////////////////////////////////
-          if (attemptId == null && leafTaskNum() > 0) {
-            synchronized (leafTasks){
-              attemptId = leafTasks.iterator().next();
-              leafTasks.remove(attemptId);
-              rackLocalAssigned++;
-              totalAssigned++;
-              LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
-                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
-                  ((double) hostLocalAssigned / (double) totalAssigned) * 100));
-            }
-          }
-        }
-
-        if (attemptId != null) {
-          Task task = stage.getTask(attemptId.getTaskId());
-          TaskRequest taskAssign = new TaskRequestImpl(
-              attemptId,
-              new ArrayList<FragmentProto>(task.getAllFragments()),
-              "",
-              false,
-              LogicalNodeSerializer.serialize(task.getLogicalPlan()),
-              context.getMasterContext().getQueryContext(),
-              stage.getDataChannel(), stage.getBlock().getEnforcer());
-          if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
-            taskAssign.setInterQuery();
-          }
-
-          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(), connectionInfo));
-          assignedRequest.add(attemptId);
-
-          scheduledObjectNum--;
-          taskRequest.getCallback().run(taskAssign.getProto());
-        } else {
-          throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
-        }
-      }
-    }
-
-    private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
-      if (masterPlan.isRoot(block)) {
-        return false;
-      }
-
-      ExecutionBlock parent = masterPlan.getParent(block);
-      if (masterPlan.isRoot(parent) && parent.hasUnion()) {
-        return false;
-      }
-
-      return true;
-    }
-
-    public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
-      Collections.shuffle(taskRequests);
-
-      TaskRequestEvent taskRequest;
-      while (!taskRequests.isEmpty()) {
-        taskRequest = taskRequests.pollFirst();
-        LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
-
-        TaskAttemptId attemptId;
-        // random allocation
-        if (nonLeafTasks.size() > 0) {
-          synchronized (nonLeafTasks){
-            attemptId = nonLeafTasks.iterator().next();
-            nonLeafTasks.remove(attemptId);
-          }
-          LOG.debug("Assigned based on * match");
-
-          Task task;
-          task = stage.getTask(attemptId.getTaskId());
-          TaskRequest taskAssign = new TaskRequestImpl(
-              attemptId,
-              Lists.newArrayList(task.getAllFragments()),
-              "",
-              false,
-              LogicalNodeSerializer.serialize(task.getLogicalPlan()),
-              context.getMasterContext().getQueryContext(),
-              stage.getDataChannel(),
-              stage.getBlock().getEnforcer());
-          if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
-            taskAssign.setInterQuery();
-          }
-          for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
-            Collection<FetchImpl> fetches = entry.getValue();
-            if (fetches != null) {
-              for (FetchImpl fetch : fetches) {
-                taskAssign.addFetch(entry.getKey(), fetch);
-              }
-            }
-          }
-
-          WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator().
-              getWorkerConnectionInfo(taskRequest.getWorkerId());
-          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(), connectionInfo));
-          taskRequest.getCallback().run(taskAssign.getProto());
-          totalAssigned++;
-          scheduledObjectNum--;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
deleted file mode 100644
index 21e376c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.util.List;
-import java.util.Map;
-
-public class FetchScheduleEvent extends TaskSchedulerEvent {
-  private final Map<String, List<FetchImpl>> fetches;
-
-  public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
-                            final Map<String, List<FetchImpl>> fetches) {
-    super(eventType, blockId);
-    this.fetches = fetches;
-  }
-
-  public Map<String, List<FetchImpl>> getFetches() {
-    return fetches;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
deleted file mode 100644
index 827386b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Objects;
-import org.apache.tajo.storage.fragment.Fragment;
-
-/**
- * FragmentPair consists of two fragments, a left fragment and a right fragment.
- * According to queries, it can have the different values.
- * For join queries, it is assumed to have both fragments.
- * Also, the left fragment is assumed to be a fragment of the larger table.
- * For other queries, it is assumed to have only a left fragment.
- */
-public class FragmentPair {
-  private Fragment leftFragment;
-  private Fragment rightFragment;
-
-  public FragmentPair(Fragment left) {
-    this.leftFragment = left;
-  }
-
-  public FragmentPair(Fragment left, Fragment right) {
-    this.leftFragment = left;
-    this.rightFragment = right;
-  }
-
-  public Fragment getLeftFragment() {
-    return leftFragment;
-  }
-
-  public Fragment getRightFragment() {
-    return rightFragment;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof FragmentPair) {
-      FragmentPair other = (FragmentPair) o;
-      boolean eq = this.leftFragment.equals(other.leftFragment);
-      if (this.rightFragment != null && other.rightFragment != null) {
-        eq &= this.rightFragment.equals(other.rightFragment);
-      } else if (this.rightFragment == null && other.rightFragment == null) {
-        eq &= true;
-      } else {
-        return false;
-      }
-      return eq;
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(leftFragment, rightFragment);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 51964f0..9d853a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -39,7 +39,7 @@ import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.exec.DDLExecutor;
 import org.apache.tajo.master.exec.QueryExecutor;
 import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.logical.InsertNode;
 import org.apache.tajo.plan.logical.LogicalRootNode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
deleted file mode 100644
index d6ea459..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner {
-  private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
-  
-  private QueryId queryId;
-  private String sessionId;
-  private SeqScanExec scanExec;
-  private TableDesc tableDesc;
-  private RowStoreEncoder rowEncoder;
-  private int maxRow;
-  private int currentNumRows;
-  private TaskAttemptContext taskContext;
-  private TajoConf tajoConf;
-  private ScanNode scanNode;
-  
-  private int currentFragmentIndex = 0;
-
-  public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode,
-      TableDesc tableDesc, int maxRow) throws IOException {
-    this.tajoConf = tajoConf;
-    this.sessionId = sessionId;
-    this.queryId = queryId;
-    this.scanNode = scanNode;
-    this.tableDesc = tableDesc;
-    this.maxRow = maxRow;
-    this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
-  }
-
-  public void init() throws IOException {
-    initSeqScanExec();
-  }
-
-  private void initSeqScanExec() throws IOException {
-    List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
-        .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
-    
-    if (fragments != null && !fragments.isEmpty()) {
-      FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {}));
-      this.taskContext = new TaskAttemptContext(
-          new QueryContext(tajoConf), null, 
-          new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), 
-          fragmentProtos, null);
-      try {
-        // scanNode must be clone cause SeqScanExec change target in the case of
-        // a partitioned table.
-        scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos);
-      } catch (CloneNotSupportedException e) {
-        throw new IOException(e.getMessage(), e);
-      }
-      scanExec.init();
-      currentFragmentIndex += fragments.size();
-    }
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-
-  public void setScanExec(SeqScanExec scanExec) {
-    this.scanExec = scanExec;
-  }
-
-  public TableDesc getTableDesc() {
-    return tableDesc;
-  }
-
-  public void close() throws Exception {
-    if (scanExec != null) {
-      scanExec.close();
-      scanExec = null;
-    }
-  }
-
-  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
-    List<ByteString> rows = new ArrayList<ByteString>();
-    if (scanExec == null) {
-      return rows;
-    }
-    int rowCount = 0;
-    while (true) {
-      Tuple tuple = scanExec.next();
-      if (tuple == null) {
-        scanExec.close();
-        scanExec = null;
-        initSeqScanExec();
-        if (scanExec != null) {
-          tuple = scanExec.next();
-        }
-        if (tuple == null) {
-          if (scanExec != null) {
-            scanExec.close();
-            scanExec = null;
-          }
-          break;
-        }
-      }
-      rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
-      rowCount++;
-      currentNumRows++;
-      if (rowCount >= fetchRowNum) {
-        break;
-      }
-      if (currentNumRows >= maxRow) {
-        scanExec.close();
-        scanExec = null;
-        break;
-      }
-    }
-    return rows;
-  }
-
-  @Override
-  public Schema getLogicalSchema() {
-    return tableDesc.getLogicalSchema();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
deleted file mode 100644
index 7e7d705..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.tajo.QueryId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-
-import com.google.protobuf.ByteString;
-
-public interface NonForwardQueryResultScanner {
-
-  public void close() throws Exception;
-
-  public Schema getLogicalSchema();
-
-  public List<ByteString> getNextRows(int fetchRowNum) throws IOException;
-
-  public QueryId getQueryId();
-  
-  public String getSessionId();
-  
-  public TableDesc getTableDesc();
-
-  public void init() throws IOException;
-
-}