You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:39 UTC

[17/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
new file mode 100644
index 0000000..0973aa7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.FileUtil;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+
+public class TaskHistory {
+  private long startTime;
+  private long finishTime;
+
+  private String status;
+  private String outputPath;
+  private String workingPath;
+  private float progress;
+
+  private TableStats inputStats;
+  private TableStats outputStats;
+
+  Map<URI, FetcherHistory> fetchers;
+
+  public static class FetcherHistory {
+    private long startTime;
+    private long finishTime;
+
+    private String status;
+    private String uri;
+    private long fileLen;
+    private int messageReceiveCount;
+
+    public long getStartTime() {
+      return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+      this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+      return finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+      this.finishTime = finishTime;
+    }
+
+    public String getStatus() {
+      return status;
+    }
+
+    public void setStatus(String status) {
+      this.status = status;
+    }
+
+    public String getUri() {
+      return uri;
+    }
+
+    public void setUri(String uri) {
+      this.uri = uri;
+    }
+
+    public long getFileLen() {
+      return fileLen;
+    }
+
+    public void setFileLen(long fileLen) {
+      this.fileLen = fileLen;
+    }
+
+    public int getMessageReceiveCount() {
+      return messageReceiveCount;
+    }
+
+    public void setMessageReceiveCount(int messageReceiveCount) {
+      this.messageReceiveCount = messageReceiveCount;
+    }
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public String getStatus() {
+    return status;
+  }
+
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  public String getOutputPath() {
+    return outputPath;
+  }
+
+  public void setOutputPath(String outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  public String getWorkingPath() {
+    return workingPath;
+  }
+
+  public void setWorkingPath(String workingPath) {
+    this.workingPath = workingPath;
+  }
+
+  public Collection<FetcherHistory> getFetchers() {
+    return fetchers.values();
+  }
+
+  public void setFetchers(Map<URI, FetcherHistory> fetchers) {
+    this.fetchers = fetchers;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public boolean hasFetcher() {
+    return fetchers != null && !fetchers.isEmpty();
+  }
+
+  public TableStats getInputStats() {
+    return inputStats;
+  }
+
+  public void setInputStats(TableStats inputStats) {
+    this.inputStats = inputStats;
+  }
+
+  public TableStats getOutputStats() {
+    return outputStats;
+  }
+
+  public void setOutputStats(TableStats outputStats) {
+    this.outputStats = outputStats;
+  }
+
+  public static String toInputStatsString(TableStats tableStats) {
+    if (tableStats == null) {
+      return "No input statistics";
+    }
+
+    String result = "";
+    result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
+        + tableStats.getNumBytes() + " B)";
+    result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
+        + tableStats.getReadBytes() + " B)";
+    result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
+
+    return result;
+  }
+
+  public static String toOutputStatsString(TableStats tableStats) {
+    if (tableStats == null) {
+      return "No output statistics";
+    }
+
+    return tableStats.toJson();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
new file mode 100644
index 0000000..9e904cd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.engine.utils.TupleCache;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+/**
+ * The driver class for Tajo QueryUnit processing.
+ */
+public class TaskRunner extends AbstractService {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(TaskRunner.class);
+
+  private TajoConf systemConf;
+
+  private volatile boolean stopped = false;
+
+  private ExecutionBlockId executionBlockId;
+  private QueryId queryId;
+  private NodeId nodeId;
+  private ContainerId containerId;
+
+  // Cluster Management
+  //private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
+
+  // for temporal or intermediate files
+  private FileSystem localFS;
+  // for input files
+  private FileSystem defaultFS;
+
+  private TajoQueryEngine queryEngine;
+
+  // for Fetcher
+  private final ExecutorService fetchLauncher;
+  // It keeps all of the query unit attempts while a TaskRunner is running.
+  private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
+
+  private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
+      new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();
+
+  private LocalDirAllocator lDirAllocator;
+
+  // A thread to receive each assigned query unit and execute the query unit
+  private Thread taskLauncher;
+
+  // Contains the object references related for TaskRunner
+  private TaskRunnerContext taskRunnerContext;
+  // for the doAs block
+  private UserGroupInformation taskOwner;
+
+  // for the local temporal dir
+  private String baseDir;
+  private Path baseDirPath;
+
+  private TaskRunnerManager taskRunnerManager;
+
+  private long finishTime;
+
+  private RpcConnectionPool connPool;
+
+  private InetSocketAddress qmMasterAddr;
+
+  public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
+    super(TaskRunner.class.getName());
+
+    this.taskRunnerManager = taskRunnerManager;
+    this.connPool = RpcConnectionPool.getPool(conf);
+    this.fetchLauncher = Executors.newFixedThreadPool(
+        conf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM));
+    try {
+      final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
+
+      LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
+      LOG.info("Worker Local Dir: " + conf.getVar(ConfVars.WORKER_TEMPORAL_DIR));
+
+      UserGroupInformation.setConfiguration(conf);
+
+      // QueryBlockId from String
+      // NodeId has a form of hostname:port.
+      NodeId nodeId = ConverterUtils.toNodeId(args[2]);
+      this.containerId = ConverterUtils.toContainerId(args[3]);
+
+      // QueryMaster's address
+      String host = args[4];
+      int port = Integer.parseInt(args[5]);
+      this.qmMasterAddr = NetUtils.createSocketAddrForHost(host, port);
+
+      LOG.info("QueryMaster Address:" + qmMasterAddr);
+      // TODO - 'load credential' should be implemented
+      // Getting taskOwner
+      UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
+      //taskOwner.addToken(token);
+
+      // initialize MasterWorkerProtocol as an actual task owner.
+//      this.client =
+//          taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
+//            @Override
+//            public AsyncRpcClient run() throws Exception {
+//              return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+//            }
+//          });
+//      this.master = client.getStub();
+
+      this.executionBlockId = executionBlockId;
+      this.queryId = executionBlockId.getQueryId();
+      this.nodeId = nodeId;
+      this.taskOwner = taskOwner;
+
+      this.taskRunnerContext = new TaskRunnerContext();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public String getId() {
+    return executionBlockId + "," + containerId;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.systemConf = (TajoConf)conf;
+
+    try {
+      // initialize DFS and LocalFileSystems
+      defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf);
+      localFS = FileSystem.getLocal(conf);
+
+      // the base dir for an output dir
+      baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
+
+      // initialize LocalDirAllocator
+      lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+      baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
+      LOG.info("TaskRunner basedir is created (" + baseDir +")");
+
+      // Setup QueryEngine according to the query plan
+      // Here, we can setup row-based query engine or columnar query engine.
+      this.queryEngine = new TajoQueryEngine(systemConf);
+    } catch (Throwable t) {
+      t.printStackTrace();
+      LOG.error(t);
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    run();
+  }
+
+  @Override
+  public void stop() {
+    if(isStopped()) {
+      return;
+    }
+    finishTime = System.currentTimeMillis();
+    // If this flag become true, taskLauncher will be terminated.
+    this.stopped = true;
+
+    // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
+    for (Task task : tasks.values()) {
+      if (task.getStatus() == TaskAttemptState.TA_PENDING ||
+          task.getStatus() == TaskAttemptState.TA_RUNNING) {
+        task.setState(TaskAttemptState.TA_FAILED);
+      }
+    }
+
+    tasks.clear();
+    fetchLauncher.shutdown();
+    this.queryEngine = null;
+
+    TupleCache.getInstance().removeBroadcastCache(executionBlockId);
+
+    LOG.info("Stop TaskRunner: " + executionBlockId);
+    synchronized (this) {
+      notifyAll();
+    }
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public class TaskRunnerContext {
+    public TajoConf getConf() {
+      return systemConf;
+    }
+
+    public String getNodeId() {
+      return nodeId.toString();
+    }
+
+    public FileSystem getLocalFS() {
+      return localFS;
+    }
+
+    public FileSystem getDefaultFS() {
+      return defaultFS;
+    }
+
+    public LocalDirAllocator getLocalDirAllocator() {
+      return lDirAllocator;
+    }
+
+    public TajoQueryEngine getTQueryEngine() {
+      return queryEngine;
+    }
+
+    public Map<QueryUnitAttemptId, Task> getTasks() {
+      return tasks;
+    }
+
+    public Task getTask(QueryUnitAttemptId taskId) {
+      return tasks.get(taskId);
+    }
+
+    public ExecutorService getFetchLauncher() {
+      return fetchLauncher;
+    }
+
+    public Path getBaseDir() {
+      return baseDirPath;
+    }
+
+    public ExecutionBlockId getExecutionBlockId() {
+      return executionBlockId;
+    }
+
+    public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
+      taskHistories.put(quAttemptId, taskHistory);
+    }
+
+    public TaskHistory getTaskHistory(QueryUnitAttemptId quAttemptId) {
+      return taskHistories.get(quAttemptId);
+    }
+
+    public Map<QueryUnitAttemptId, TaskHistory> getTaskHistories() {
+      return taskHistories;
+    }
+  }
+
+  public TaskRunnerContext getContext() {
+    return taskRunnerContext;
+  }
+
+  static void fatalError(QueryMasterProtocolService.Interface qmClientService,
+                         QueryUnitAttemptId taskAttemptId, String message) {
+    if (message == null) {
+       message = "No error message";
+    }
+    TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
+        .setId(taskAttemptId.getProto())
+        .setErrorMessage(message);
+
+    qmClientService.fatalError(null, builder.build(), NullCallback.get());
+  }
+
+  public void run() {
+    LOG.info("TaskRunner startup");
+
+    try {
+
+      taskLauncher = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          int receivedNum = 0;
+          CallFuture<QueryUnitRequestProto> callFuture = null;
+          QueryUnitRequestProto taskRequest = null;
+
+          while(!stopped) {
+            NettyClientBase qmClient = null;
+            QueryMasterProtocolService.Interface qmClientService = null;
+            try {
+              qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+              qmClientService = qmClient.getStub();
+
+              if (callFuture == null) {
+                callFuture = new CallFuture<QueryUnitRequestProto>();
+                LOG.info("Request GetTask: " + getId());
+                GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
+                    .setExecutionBlockId(executionBlockId.getProto())
+                    .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+                    .build();
+
+                qmClientService.getTask(null, request, callFuture);
+              }
+              try {
+                // wait for an assigning task for 3 seconds
+                taskRequest = callFuture.get(3, TimeUnit.SECONDS);
+              } catch (InterruptedException e) {
+                if(stopped) {
+                  break;
+                }
+              } catch (TimeoutException te) {
+                if(stopped) {
+                  break;
+                }
+                // if there has been no assigning task for a given period,
+                // TaskRunner will retry to request an assigning task.
+                LOG.info("Retry assigning task:" + getId());
+                continue;
+              }
+
+              if (taskRequest != null) {
+                // QueryMaster can send the terminal signal to TaskRunner.
+                // If TaskRunner receives the terminal signal, TaskRunner will be terminated
+                // immediately.
+                if (taskRequest.getShouldDie()) {
+                  LOG.info("Received ShouldDie flag:" + getId());
+                  stop();
+                  if(taskRunnerManager != null) {
+                    //notify to TaskRunnerManager
+                    taskRunnerManager.stopTask(getId());
+                  }
+                } else {
+                  taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
+                  LOG.info("Accumulated Received Task: " + (++receivedNum));
+
+                  QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
+                  if (tasks.containsKey(taskAttemptId)) {
+                    LOG.error("Duplicate Task Attempt: " + taskAttemptId);
+                    fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
+                    continue;
+                  }
+
+                  LOG.info("Initializing: " + taskAttemptId);
+                  Task task;
+                  try {
+                    task = new Task(taskAttemptId, taskRunnerContext, qmClientService,
+                        new QueryUnitRequestImpl(taskRequest));
+                    tasks.put(taskAttemptId, task);
+
+                    task.init();
+                    if (task.hasFetchPhase()) {
+                      task.fetch(); // The fetch is performed in an asynchronous way.
+                    }
+                    // task.run() is a blocking call.
+                    task.run();
+                  } catch (Throwable t) {
+                    LOG.error(t.getMessage(), t);
+                    fatalError(qmClientService, taskAttemptId, t.getMessage());
+                  } finally {
+                    callFuture = null;
+                    taskRequest = null;
+                  }
+                }
+              }
+            } catch (Throwable t) {
+              t.printStackTrace();
+            } finally {
+              connPool.releaseConnection(qmClient);
+            }
+          }
+        }
+      });
+      taskLauncher.start();
+    } catch (Throwable t) {
+      LOG.fatal("Unhandled exception. Starting shutdown.", t);
+    } finally {
+      for (Task t : tasks.values()) {
+        if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
+          t.abort();
+        }
+      }
+    }
+  }
+
+  /**
+   * @return true if a stop has been requested.
+   */
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return this.executionBlockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
new file mode 100644
index 0000000..da434e4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TaskRunnerManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
+
+  private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
+  private final Map<String, TaskRunner> finishedTaskRunnerMap = new HashMap<String, TaskRunner>();
+  private TajoWorker.WorkerContext workerContext;
+  private TajoConf tajoConf;
+  private AtomicBoolean stop = new AtomicBoolean(false);
+  private FinishedTaskCleanThread finishedTaskCleanThread;
+
+  public TaskRunnerManager(TajoWorker.WorkerContext workerContext) {
+    super(TaskRunnerManager.class.getName());
+
+    this.workerContext = workerContext;
+  }
+
+  public TajoWorker.WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    tajoConf = (TajoConf)conf;
+    super.init(tajoConf);
+  }
+
+  @Override
+  public void start() {
+    finishedTaskCleanThread = new FinishedTaskCleanThread();
+    finishedTaskCleanThread.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(stop.get()) {
+      return;
+    }
+    stop.set(true);
+    synchronized(taskRunnerMap) {
+      for(TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+        if(!eachTaskRunner.isStopped()) {
+          eachTaskRunner.stop();
+        }
+      }
+    }
+
+    if(finishedTaskCleanThread != null) {
+      finishedTaskCleanThread.interrupted();
+    }
+    super.stop();
+    if(workerContext.isYarnContainerMode()) {
+      workerContext.stopWorker(true);
+    }
+  }
+
+  public void stopTask(String id) {
+    LOG.info("Stop Task:" + id);
+    synchronized(taskRunnerMap) {
+      TaskRunner taskRunner = taskRunnerMap.remove(id);
+      if(taskRunner != null) {
+        finishedTaskRunnerMap.put(id, taskRunner);
+      }
+    }
+    if(workerContext.isYarnContainerMode()) {
+      stop();
+    }
+  }
+
+  public Collection<TaskRunner> getTaskRunners() {
+    synchronized(taskRunnerMap) {
+      return Collections.unmodifiableCollection(taskRunnerMap.values());
+    }
+  }
+
+  public Collection<TaskRunner> getFinishedTaskRunners() {
+    synchronized(finishedTaskRunnerMap) {
+      return Collections.unmodifiableCollection(finishedTaskRunnerMap.values());
+    }
+  }
+
+  public TaskRunner findTaskRunner(String taskRunnerId) {
+    synchronized(taskRunnerMap) {
+      if(taskRunnerMap.containsKey(taskRunnerId)) {
+        return taskRunnerMap.get(taskRunnerId);
+      }
+    }
+    synchronized(finishedTaskRunnerMap) {
+      return finishedTaskRunnerMap.get(taskRunnerId);
+    }
+  }
+
+  public Task findTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+    ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+    synchronized(taskRunnerMap) {
+      for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+          Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+          if (task != null) {
+            return task;
+          }
+        }
+      }
+    }
+    synchronized(finishedTaskRunnerMap) {
+      for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
+        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+          Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+          if (task != null) {
+            return task;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  public TaskHistory findTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+    ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+    synchronized(taskRunnerMap) {
+      for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+          TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
+          if (taskHistory != null) {
+            return taskHistory;
+          }
+        }
+      }
+    }
+    synchronized(finishedTaskRunnerMap) {
+      for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
+        if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+          TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
+          if (taskHistory != null) {
+            return taskHistory;
+          }
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public int getNumTasks() {
+    synchronized(taskRunnerMap) {
+      return taskRunnerMap.size();
+    }
+  }
+
+  public void startTask(final String[] params) {
+    //TODO change to use event dispatcher
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          TajoConf systemConf = new TajoConf(tajoConf);
+          TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, systemConf, params);
+          LOG.info("Start TaskRunner:" + taskRunner.getId());
+          synchronized(taskRunnerMap) {
+            taskRunnerMap.put(taskRunner.getId(), taskRunner);
+          }
+          taskRunner.init(systemConf);
+          taskRunner.start();
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+          throw new RuntimeException(e.getMessage(), e);
+        }
+      }
+    };
+
+    t.start();
+  }
+
+  class FinishedTaskCleanThread extends Thread {
+    public void run() {
+      int expireIntervalTime = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+      LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
+      while(!stop.get()) {
+        try {
+          Thread.sleep(60 * 1000 * 60);   // hourly check
+        } catch (InterruptedException e) {
+          break;
+        }
+        try {
+          long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000;
+          cleanExpiredFinishedQueryMasterTask(expireTime);
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }
+
+    private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
+      synchronized(finishedTaskRunnerMap) {
+        List<String> expiredIds = new ArrayList<String>();
+        for(Map.Entry<String, TaskRunner> entry: finishedTaskRunnerMap.entrySet()) {
+          if(entry.getValue().getStartTime() > expireTime) {
+            expiredIds.add(entry.getKey());
+          }
+        }
+
+        for(String eachId: expiredIds) {
+          finishedTaskRunnerMap.remove(eachId);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
new file mode 100644
index 0000000..007bcbf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.v2.DiskDeviceInfo;
+import org.apache.tajo.storage.v2.DiskMountInfo;
+import org.apache.tajo.storage.v2.DiskUtil;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+
+/**
+ * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
+ */
+public class WorkerHeartbeatService extends AbstractService {
+  /** class logger */
+  private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);
+
+  private final TajoWorker.WorkerContext context;
+  private TajoConf systemConf;
+  private RpcConnectionPool connectionPool;
+  private WorkerHeartbeatThread thread;
+  private static final float HDFS_DATANODE_STORAGE_SIZE;
+
+  static {
+    HDFS_DATANODE_STORAGE_SIZE = DiskUtil.getDataNodeStorageSize();
+  }
+
+  public WorkerHeartbeatService(TajoWorker.WorkerContext context) {
+    super(WorkerHeartbeatService.class.getSimpleName());
+    this.context = context;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
+    this.systemConf = (TajoConf) conf;
+
+    connectionPool = RpcConnectionPool.getPool(systemConf);
+    thread = new WorkerHeartbeatThread();
+    thread.start();
+    super.init(conf);
+  }
+
+  @Override
+  public void serviceStop() {
+    thread.stopped.set(true);
+    synchronized (thread) {
+      thread.notifyAll();
+    }
+    super.stop();
+  }
+
+  class WorkerHeartbeatThread extends Thread {
+    private volatile AtomicBoolean stopped = new AtomicBoolean(false);
+    TajoMasterProtocol.ServerStatusProto.System systemInfo;
+    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
+        new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+    float workerDiskSlots;
+    int workerMemoryMB;
+    List<DiskDeviceInfo> diskDeviceInfos;
+
+    public WorkerHeartbeatThread() {
+      int workerCpuCoreNum;
+
+      boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED);
+      int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+
+      try {
+        diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+
+      if(dedicatedResource) {
+        float dedicatedMemoryRatio = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
+        int totalMemory = getTotalMemoryMB();
+        workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
+        workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
+
+        if(diskDeviceInfos == null) {
+          workerDiskSlots = TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
+        } else {
+          workerDiskSlots = diskDeviceInfos.size();
+        }
+      } else {
+        workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+        workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+        workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+
+        if (systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && HDFS_DATANODE_STORAGE_SIZE > 0) {
+          workerDiskSlots = HDFS_DATANODE_STORAGE_SIZE;
+        }
+      }
+
+      systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+          .setAvailableProcessors(workerCpuCoreNum)
+          .setFreeMemoryMB(0)
+          .setMaxMemoryMB(0)
+          .setTotalMemoryMB(getTotalMemoryMB())
+          .build();
+    }
+
+    public void run() {
+      LOG.info("Worker Resource Heartbeat Thread start.");
+      int sendDiskInfoCount = 0;
+      int pullServerPort = 0;
+      if(context.getPullService()!= null) {
+        long startTime = System.currentTimeMillis();
+        while(true) {
+          pullServerPort = context.getPullService().getPort();
+          if(pullServerPort > 0) {
+            break;
+          }
+          //waiting while pull server init
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+          }
+          if(System.currentTimeMillis() - startTime > 30 * 1000) {
+            LOG.fatal("Too long push server init.");
+            System.exit(0);
+          }
+        }
+      }
+
+      String hostName = null;
+      int peerRpcPort = 0;
+      int queryMasterPort = 0;
+      int clientPort = 0;
+
+      if(context.getTajoWorkerManagerService() != null) {
+        hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
+        peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
+      }
+      if(context.getQueryMasterManagerService() != null) {
+        hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
+        queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
+      }
+      if(context.getTajoWorkerClientService() != null) {
+        clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
+      }
+      if (context.getPullService() != null) {
+        pullServerPort = context.getPullService().getPort();
+      }
+
+      while(!stopped.get()) {
+        if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
+          getDiskUsageInfos();
+        }
+        TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
+            TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+                .setMaxHeap(Runtime.getRuntime().maxMemory())
+                .setFreeHeap(Runtime.getRuntime().freeMemory())
+                .setTotalHeap(Runtime.getRuntime().totalMemory())
+                .build();
+
+        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+            .addAllDisk(diskInfos)
+            .setRunningTaskNum(
+                context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
+            .setSystem(systemInfo)
+            .setDiskSlots(workerDiskSlots)
+            .setMemoryResourceMB(workerMemoryMB)
+            .setJvmHeap(jvmHeap)
+            .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
+            .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
+            .build();
+
+        NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
+            .setTajoWorkerHost(hostName)
+            .setTajoQueryMasterPort(queryMasterPort)
+            .setPeerRpcPort(peerRpcPort)
+            .setTajoWorkerClientPort(clientPort)
+            .setTajoWorkerHttpPort(context.getHttpPort())
+            .setTajoWorkerPullServerPort(pullServerPort)
+            .setServerStatus(serverStatus)
+            .build();
+
+        NettyClientBase rmClient = null;
+        try {
+          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+
+          rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
+          TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
+          resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
+
+          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+          if(response != null) {
+            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+            if(clusterResourceSummary.getNumWorkers() > 0) {
+              context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
+            }
+            context.setClusterResource(clusterResourceSummary);
+          } else {
+            if(callBack.getController().failed()) {
+              throw new ServiceException(callBack.getController().errorText());
+            }
+          }
+        } catch (InterruptedException e) {
+          break;
+        } catch (TimeoutException te) {
+          LOG.warn("Heartbeat response is being delayed.");
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        } finally {
+          connectionPool.releaseConnection(rmClient);
+        }
+
+        try {
+          synchronized (WorkerHeartbeatThread.this){
+            wait(10 * 1000);
+          }
+        } catch (InterruptedException e) {
+          break;
+        }
+        sendDiskInfoCount++;
+
+        if(sendDiskInfoCount > 10) {
+          sendDiskInfoCount = 0;
+        }
+      }
+
+      LOG.info("Worker Resource Heartbeat Thread stopped.");
+    }
+
+    private void getDiskUsageInfos() {
+      diskInfos.clear();
+      for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
+        List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
+        if(mountInfos != null) {
+          for(DiskMountInfo eachMount: mountInfos) {
+            File eachFile = new File(eachMount.getMountPath());
+            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+                .setAbsolutePath(eachFile.getAbsolutePath())
+                .setTotalSpace(eachFile.getTotalSpace())
+                .setFreeSpace(eachFile.getFreeSpace())
+                .setUsableSpace(eachFile.getUsableSpace())
+                .build());
+          }
+        }
+      }
+    }
+  }
+
+  public static int getTotalMemoryMB() {
+    com.sun.management.OperatingSystemMXBean bean =
+        (com.sun.management.OperatingSystemMXBean)
+            java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+    long max = bean.getTotalPhysicalMemorySize();
+    return ((int) (max / (1024 * 1024)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
new file mode 100644
index 0000000..1771255
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.YarnTaskRunnerLauncherImpl;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.rm.YarnRMContainerAllocator;
+
+public class YarnResourceAllocator extends AbstractResourceAllocator {
+  private YarnRMContainerAllocator rmAllocator;
+
+  private TaskRunnerLauncher taskRunnerLauncher;
+
+  private YarnRPC yarnRPC;
+
+  private YarnClient yarnClient;
+
+  private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
+
+  private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
+
+  private TajoConf systemConf;
+
+  public YarnResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+    this.queryTaskContext = queryTaskContext;
+  }
+
+  @Override
+  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId) {
+    return new ContainerIdPBImpl(containerId);
+  }
+
+  @Override
+  public void allocateTaskWorker() {
+  }
+
+  @Override
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+                                           int numTasks,
+                                           int memoryMBPerTask) {
+    int numClusterNodes = workerContext.getNumClusterNodes();
+
+    TajoConf conf =  (TajoConf)workerContext.getQueryMaster().getConfig();
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.YARN_RM_WORKER_NUMBER_PER_NODE);
+    return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * workerNum);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    systemConf = (TajoConf)conf;
+
+    yarnRPC = YarnRPC.create(systemConf);
+
+    connectYarnClient();
+
+    taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryTaskContext, yarnRPC);
+    addService((Service) taskRunnerLauncher);
+    queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+
+    rmAllocator = new YarnRMContainerAllocator(queryTaskContext);
+    addService(rmAllocator);
+    queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    try {
+      this.yarnClient.stop();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  private void connectYarnClient() {
+    this.yarnClient = new YarnClientImpl();
+    this.yarnClient.init(systemConf);
+    this.yarnClient.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
new file mode 100644
index 0000000..6c93e4f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import java.io.IOException;
+
+public class FileAccessForbiddenException extends IOException {
+  private static final long serialVersionUID = -3383272565826389213L;
+
+  public FileAccessForbiddenException() {
+  }
+
+  public FileAccessForbiddenException(String message) {
+    super(message);
+  }
+
+  public FileAccessForbiddenException(Throwable cause) {
+    super(cause);
+  }
+
+  public FileAccessForbiddenException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
new file mode 100644
index 0000000..523d6e1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+public class HttpDataServer {
+  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
+
+  private final InetSocketAddress addr;
+  private InetSocketAddress bindAddr;
+  private ServerBootstrap bootstrap = null;
+  private ChannelFactory factory = null;
+  private ChannelGroup channelGroup = null;
+
+  public HttpDataServer(final InetSocketAddress addr, 
+      final DataRetriever retriever) {
+    this.addr = addr;
+    this.factory = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+        Runtime.getRuntime().availableProcessors() * 2);
+
+    // Configure the server.
+    this.bootstrap = new ServerBootstrap(factory);
+    // Set up the event pipeline factory.
+    this.bootstrap.setPipelineFactory(
+        new HttpDataServerPipelineFactory(retriever));    
+    this.channelGroup = new DefaultChannelGroup();
+  }
+
+  public HttpDataServer(String bindaddr, DataRetriever retriever) {
+    this(NetUtils.createSocketAddr(bindaddr), retriever);
+  }
+
+  public void start() {
+    // Bind and start to accept incoming connections.
+    Channel channel = bootstrap.bind(addr);
+    channelGroup.add(channel);    
+    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+    LOG.info("HttpDataServer starts up ("
+        + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+        + ")");
+  }
+  
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddr;
+  }
+
+  public void stop() {
+    ChannelGroupFuture future = channelGroup.close();
+    future.awaitUninterruptibly();
+    factory.releaseExternalResources();
+
+    LOG.info("HttpDataServer shutdown ("
+        + this.bindAddr.getAddress().getHostAddress() + ":"
+        + this.bindAddr.getPort() + ")");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
new file mode 100644
index 0000000..6b9eea8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+
+import java.io.*;
+import java.net.URLDecoder;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
+  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
+  private final DataRetriever retriever;
+
+  public HttpDataServerHandler(DataRetriever retriever) {
+    this.retriever = retriever;
+  }
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    HttpRequest request = (HttpRequest) e.getMessage();
+    if (request.getMethod() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+
+    FileChunk [] file;
+    try {
+      file = retriever.handle(ctx, request);
+    } catch (FileNotFoundException fnf) {
+      LOG.error(fnf);
+      sendError(ctx, NOT_FOUND);
+      return;
+    } catch (IllegalArgumentException iae) {
+      LOG.error(iae);
+      sendError(ctx, BAD_REQUEST);
+      return;
+    } catch (FileAccessForbiddenException fafe) {
+      LOG.error(fafe);
+      sendError(ctx, FORBIDDEN);
+      return;
+    } catch (IOException ioe) {
+      LOG.error(ioe);
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+      return;
+    }
+
+    // Write the content.
+    Channel ch = e.getChannel();
+    if (file == null) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+      ch.write(response);
+      if (!isKeepAlive(request)) {
+        ch.close();
+      }
+    }  else {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      long totalSize = 0;
+      for (FileChunk chunk : file) {
+        totalSize += chunk.length();
+      }
+      setContentLength(response, totalSize);
+
+      // Write the initial line and the header.
+      ch.write(response);
+
+      ChannelFuture writeFuture = null;
+
+      for (FileChunk chunk : file) {
+        writeFuture = sendFile(ctx, ch, chunk);
+        if (writeFuture == null) {
+          sendError(ctx, NOT_FOUND);
+          return;
+        }
+      }
+
+      // Decide whether to close the connection or not.
+      if (!isKeepAlive(request)) {
+        // Close the connection when the whole content is written out.
+        writeFuture.addListener(ChannelFutureListener.CLOSE);
+      }
+    }
+  }
+
+  private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException {
+    RandomAccessFile raf;
+    try {
+      raf = new RandomAccessFile(file.getFile(), "r");
+    } catch (FileNotFoundException fnfe) {
+      return null;
+    }
+
+    ChannelFuture writeFuture;
+    if (ch.getPipeline().get(SslHandler.class) != null) {
+      // Cannot use zero-copy with HTTPS.
+      writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192));
+    } else {
+      // No encryption - use zero-copy.
+      final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length());
+      writeFuture = ch.write(region);
+      writeFuture.addListener(new ChannelFutureListener() {
+        public void operationComplete(ChannelFuture future) {
+          region.releaseExternalResources();
+        }
+      });
+    }
+
+    return writeFuture;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+      throws Exception {
+    Channel ch = e.getChannel();
+    Throwable cause = e.getCause();
+    if (cause instanceof TooLongFrameException) {
+      sendError(ctx, BAD_REQUEST);
+      return;
+    }
+
+    cause.printStackTrace();
+    if (ch.isConnected()) {
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  public static String sanitizeUri(String uri) {
+    // Decode the path.
+    try {
+      uri = URLDecoder.decode(uri, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      try {
+        uri = URLDecoder.decode(uri, "ISO-8859-1");
+      } catch (UnsupportedEncodingException e1) {
+        throw new Error();
+      }
+    }
+
+    // Convert file separators.
+    uri = uri.replace('/', File.separatorChar);
+
+    // Simplistic dumb security check.
+    // You will have to do something serious in the production environment.
+    if (uri.contains(File.separator + ".")
+        || uri.contains("." + File.separator) || uri.startsWith(".")
+        || uri.endsWith(".")) {
+      return null;
+    }
+
+    // Convert to absolute path.
+    return uri;
+  }
+
+  private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    response.setContent(ChannelBuffers.copiedBuffer(
+        "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+
+    // Close the connection as soon as the error message is sent.
+    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
new file mode 100644
index 0000000..0a47f6b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
+  private final DataRetriever ret;
+
+  public HttpDataServerPipelineFactory(DataRetriever ret) {
+    this.ret = ret;
+  }
+
+  public ChannelPipeline getPipeline() throws Exception {
+    // Create a default pipeline implementation.
+    ChannelPipeline pipeline = pipeline();
+
+    // Uncomment the following line if you want HTTPS
+    // SSLEngine engine =
+    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+    // engine.setUseClientMode(false);
+    // pipeline.addLast("ssl", new SslHandler(engine));
+
+    pipeline.addLast("decoder", new HttpRequestDecoder());
+    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+    pipeline.addLast("encoder", new HttpResponseEncoder());
+    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+    //pipeline.addLast("deflater", new HttpContentCompressor());
+    pipeline.addLast("handler", new HttpDataServerHandler(ret));
+    return pipeline;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
new file mode 100644
index 0000000..e688c39
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import com.google.common.collect.Maps;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.Map;
+
+public class HttpUtil {
+  public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
+    return getParamsFromQuery(uri.getQuery());
+  }
+
+  /**
+   * It parses a query string into key/value pairs
+   *
+   * @param queryString decoded query string
+   * @return key/value pairs parsed from a given query string
+   * @throws UnsupportedEncodingException
+   */
+  public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
+    String [] queries = queryString.split("&");
+
+    Map<String,String> params = Maps.newHashMap();
+    String [] param;
+    for (String q : queries) {
+      param = q.split("=");
+      params.put(param[0], param[1]);
+    }
+
+    return params;
+  }
+
+  public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
+    StringBuilder sb = new StringBuilder();
+
+    boolean first = true;
+    for (Map.Entry<String,String> param : params.entrySet()) {
+      if (!first) {
+        sb.append("&");
+      }
+      sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
+          append("=").
+          append(URLEncoder.encode(param.getValue(), "UTF-8"));
+      first = false;
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
new file mode 100644
index 0000000..2ef0c4c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AdvancedDataRetriever implements DataRetriever {
+  private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
+  private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
+
+  public AdvancedDataRetriever() {
+  }
+  
+  public void register(QueryUnitAttemptId id, RetrieverHandler handler) {
+    synchronized (handlerMap) {
+      if (!handlerMap.containsKey(id.toString())) {
+        handlerMap.put(id.toString(), handler);
+      }
+    } 
+  }
+  
+  public void unregister(QueryUnitAttemptId id) {
+    synchronized (handlerMap) {
+      if (handlerMap.containsKey(id.toString())) {
+        handlerMap.remove(id.toString());
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
+   */
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+
+    final Map<String, List<String>> params =
+      new QueryStringDecoder(request.getUri()).getParameters();
+
+    if (!params.containsKey("qid")) {
+      throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
+    }
+
+    if (params.containsKey("sid")) {
+      List<FileChunk> chunks = Lists.newArrayList();
+      List<String> qids = splitMaps(params.get("qid"));
+      for (String qid : qids) {
+        String[] ids = qid.split("_");
+        ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
+        QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
+        QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
+            Integer.parseInt(ids[1]));
+        RetrieverHandler handler = handlerMap.get(attemptId.toString());
+        FileChunk chunk = handler.get(params);
+        chunks.add(chunk);
+      }
+      return chunks.toArray(new FileChunk[chunks.size()]);
+    } else {
+      RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
+      FileChunk chunk = handler.get(params);
+      if (chunk == null) {
+        if (params.containsKey("qid")) { // if there is no content corresponding to the query
+          return null;
+        } else { // if there is no
+          throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
+        }
+      }
+
+      File file = chunk.getFile();
+      if (file.isHidden() || !file.exists()) {
+        throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
+      }
+      if (!file.isFile()) {
+        throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
+      }
+
+      return new FileChunk[] {chunk};
+    }
+  }
+
+  private List<String> splitMaps(List<String> qids) {
+    if (null == qids) {
+      LOG.error("QueryUnitId is EMPTY");
+      return null;
+    }
+
+    final List<String> ret = new ArrayList<String>();
+    for (String qid : qids) {
+      Collections.addAll(ret, qid.split(","));
+    }
+    return ret;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
new file mode 100644
index 0000000..b26ba74
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+
+import java.io.IOException;
+
+public interface DataRetriever {
+  FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
new file mode 100644
index 0000000..62dabbd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.apache.tajo.worker.dataserver.HttpDataServerHandler;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class DirectoryRetriever implements DataRetriever {
+  public String baseDir;
+  
+  public DirectoryRetriever(String baseDir) {
+    this.baseDir = baseDir;
+  }
+
+  @Override
+  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+      throws IOException {
+    final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
+    if (path == null) {
+      throw new IllegalArgumentException("Wrong path: " +path);
+    }
+
+    File file = new File(baseDir, path);
+    if (file.isHidden() || !file.exists()) {
+      throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
+    }
+    if (!file.isFile()) {
+      throw new FileAccessForbiddenException("No such file: " 
+          + baseDir + "/" + path); 
+    }
+    
+    return new FileChunk[] {new FileChunk(file, 0, file.length())};
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
new file mode 100644
index 0000000..4f11168
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+public class FileChunk {
+  private final File file;
+  private final long startOffset;
+  private final long length;
+
+  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+    this.file = file;
+    this.startOffset = startOffset;
+    this.length = length;
+  }
+
+  public File getFile() {
+    return this.file;
+  }
+
+  public long startOffset() {
+    return this.startOffset;
+  }
+
+  public long length() {
+    return this.length;
+  }
+
+  public String toString() {
+    return " (start=" + startOffset() + ", length=" + length + ") "
+        + file.getAbsolutePath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
new file mode 100644
index 0000000..e5479cc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface RetrieverHandler {
+  /**
+   *
+   * @param kvs url-decoded key/value pairs
+   * @return a desired part of a file
+   * @throws IOException
+   */
+  public FileChunk get(Map<String, List<String>> kvs) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/InternalTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/InternalTypes.proto b/tajo-core/src/main/proto/InternalTypes.proto
new file mode 100644
index 0000000..1a62bc2
--- /dev/null
+++ b/tajo-core/src/main/proto/InternalTypes.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo";
+option java_outer_classname = "InternalTypes";
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+message AvgLongProto {
+  required int64 sum = 1;
+  required int64 count = 2;
+}
+
+message AvgDoubleProto {
+  required double sum = 1;
+  required int64 count = 2;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
new file mode 100644
index 0000000..e12c9aa
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "TajoWorkerProtocol.proto";
+
+service QueryMasterProtocolService {
+  //from Worker
+  rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
+  rpc statusUpdate (TaskStatusProto) returns (BoolProto);
+  rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+  rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
+  rpc done (TaskCompletionReport) returns (BoolProto);
+
+  //from TajoMaster's QueryJobManager
+  rpc killQuery(QueryIdProto) returns (BoolProto);
+  rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
new file mode 100644
index 0000000..d46d09a
--- /dev/null
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoResourceTrackerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "TajoMasterProtocol.proto";
+
+message NodeHeartbeat {
+  required string tajoWorkerHost = 1;
+  required int32 peerRpcPort = 2;
+  required int32 tajoQueryMasterPort = 3;
+  optional ServerStatusProto serverStatus = 4;
+  optional int32 tajoWorkerClientPort = 5;
+  optional string statusMessage = 6;
+  optional int32 tajoWorkerPullServerPort = 7;
+  optional int32 tajoWorkerHttpPort = 8;
+}
+
+service TajoResourceTrackerProtocolService {
+  rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
new file mode 100644
index 0000000..8fccbaf
--- /dev/null
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message ServerStatusProto {
+    message System {
+        required int32 availableProcessors = 1;
+        required int32 freeMemoryMB = 2;
+        required int32 maxMemoryMB = 3;
+        required int32 totalMemoryMB = 4;
+    }
+    message Disk {
+        required string absolutePath = 1;
+        required int64 totalSpace = 2;
+        required int64 freeSpace = 3;
+        required int64 usableSpace = 4;
+    }
+
+    message JvmHeap {
+        required int64 maxHeap = 1;
+        required int64 totalHeap = 2;
+        required int64 freeHeap = 3;
+    }
+
+    required System system = 1;
+    required float diskSlots = 2;
+    required int32 memoryResourceMB = 3;
+    repeated Disk disk = 4;
+    required int32 runningTaskNum = 5;
+    required JvmHeap jvmHeap = 6;
+    required BoolProto queryMasterMode = 7;
+    required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+  required string tajoWorkerHost = 1;
+  required int32 tajoQueryMasterPort = 2;
+  optional int32 tajoWorkerClientPort = 3;
+  optional QueryIdProto queryId = 4;
+  optional QueryState state = 5;
+  optional string statusMessage = 6;
+  optional float queryProgress = 7;
+  optional int64 queryFinishTime = 8;
+}
+
+message TajoHeartbeatResponse {
+  message ResponseCommand {
+      required string command = 1;
+      repeated string params = 2;
+  }
+  required BoolProto heartbeatResult = 1;
+  required ClusterResourceSummary clusterResourceSummary = 2;
+  optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+  required int32 numWorkers = 1;
+  required int32 totalDiskSlots = 2;
+  required int32 totalCpuCoreSlots = 3;
+  required int32 totalMemoryMB = 4;
+
+  required int32 totalAvailableDiskSlots = 5;
+  required int32 totalAvailableCpuCoreSlots = 6;
+  required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+    MEMORY = 1;
+    DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+    required QueryIdProto queryId = 1;
+    required ResourceRequestPriority resourceRequestPriority = 2;
+
+    required int32 numContainers = 3;
+
+    required int32 maxMemoryMBPerContainer = 4;
+    required int32 minMemoryMBPerContainer = 5;
+
+    required float maxDiskSlotPerContainer = 6;
+    required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+    required string host = 1;
+    required int32 peerRpcPort = 2;
+    required int32 queryMasterPort = 3;
+    required int32 infoPort = 4;
+    required int32 memoryMB = 5 ;
+    required float diskSlots = 6;
+}
+
+message WorkerResourcesRequest {
+    repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+    required ExecutionBlockIdProto executionBlockId = 1;
+    repeated hadoop.yarn.ContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+    required hadoop.yarn.ContainerIdProto containerId = 1;
+    required string nodeId = 2;
+    required string workerHost = 3;
+    required int32 peerRpcPort = 4;
+    required int32 queryMasterPort = 5;
+    required int32 clientPort = 6;
+    required int32 workerPullServerPort = 7;
+
+    required int32 allocatedMemoryMB = 8;
+    required float allocatedDiskSlots = 9;
+}
+
+message WorkerResourceAllocationResponse {
+    required QueryIdProto queryId = 1;
+    repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service TajoMasterProtocolService {
+  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+  rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
+  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file