You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:50 UTC

[28/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
new file mode 100644
index 0000000..409a1b1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -0,0 +1,896 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class DefaultTaskScheduler extends AbstractTaskScheduler {
+  private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
+
+  private final TaskSchedulerContext context;
+  private SubQuery subQuery;
+
+  private Thread schedulingThread;
+  private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
+
+  private ScheduledRequests scheduledRequests;
+  private TaskRequests taskRequests;
+
+  private int nextTaskId = 0;
+  private int scheduledObjectNum = 0;
+
+  public DefaultTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
+    super(DefaultTaskScheduler.class.getName());
+    this.context = context;
+    this.subQuery = subQuery;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+
+    scheduledRequests = new ScheduledRequests();
+    taskRequests  = new TaskRequests();
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start TaskScheduler");
+
+    this.schedulingThread = new Thread() {
+      public void run() {
+
+        while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            synchronized (schedulingThread){
+              schedulingThread.wait(100);
+            }
+          } catch (InterruptedException e) {
+            break;
+          }
+          schedule();
+        }
+        LOG.info("TaskScheduler schedulingThread stopped");
+      }
+    };
+
+    this.schedulingThread.start();
+    super.start();
+  }
+
+  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+  static {
+    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+
+    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+    builder.setId(NULL_ATTEMPT_ID.getProto());
+    builder.setShouldDie(true);
+    builder.setOutputTable("");
+    builder.setSerializedData("");
+    builder.setClusteredOutput(false);
+    stopTaskRunnerReq = builder.build();
+  }
+
+  @Override
+  public void stop() {
+    if(stopEventHandling.getAndSet(true)){
+      return;
+    }
+
+    if (schedulingThread != null) {
+      synchronized (schedulingThread) {
+        schedulingThread.notifyAll();
+      }
+    }
+
+    // Return all of request callbacks instantly.
+    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+      req.getCallback().run(stopTaskRunnerReq);
+    }
+
+    LOG.info("Task Scheduler stopped");
+    super.stop();
+  }
+
+  private FileFragment[] fragmentsForNonLeafTask;
+
+  LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
+  public void schedule() {
+
+    if (taskRequests.size() > 0) {
+      if (scheduledRequests.leafTaskNum() > 0) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", LeafTask Schedule Request: " +
+            scheduledRequests.leafTaskNum());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledRequests.leafTaskNum());
+        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+        if (taskRequestEvents.size() > 0) {
+          scheduledRequests.assignToLeafTasks(taskRequestEvents);
+          taskRequestEvents.clear();
+        }
+      }
+    }
+
+    if (taskRequests.size() > 0) {
+      if (scheduledRequests.nonLeafTaskNum() > 0) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", NonLeafTask Schedule Request: " +
+            scheduledRequests.nonLeafTaskNum());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledRequests.nonLeafTaskNum());
+        scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
+        taskRequestEvents.clear();
+      }
+    }
+  }
+
+  @Override
+  public void handle(TaskSchedulerEvent event) {
+    if (event.getType() == EventType.T_SCHEDULE) {
+      if (event instanceof FragmentScheduleEvent) {
+        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+        if (context.isLeafQuery()) {
+          QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+          QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+          task.addFragment(castEvent.getLeftFragment(), true);
+          scheduledObjectNum++;
+          if (castEvent.hasRightFragments()) {
+            task.addFragments(castEvent.getRightFragments());
+            //scheduledObjectNum += castEvent.getRightFragments().size();
+          }
+          subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+        } else {
+          fragmentsForNonLeafTask = new FileFragment[2];
+          fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
+          if (castEvent.hasRightFragments()) {
+            fragmentsForNonLeafTask[1] = castEvent.getRightFragments().toArray(new FileFragment[]{})[0];
+          }
+        }
+      } else if (event instanceof FetchScheduleEvent) {
+        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+        Map<String, List<URI>> fetches = castEvent.getFetches();
+        QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+        QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+        scheduledObjectNum++;
+        for (Entry<String, List<URI>> eachFetch : fetches.entrySet()) {
+          task.addFetches(eachFetch.getKey(), eachFetch.getValue());
+          task.addFragment(fragmentsForNonLeafTask[0], true);
+          if (fragmentsForNonLeafTask[1] != null) {
+            task.addFragment(fragmentsForNonLeafTask[1], true);
+          }
+        }
+        subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+      } else if (event instanceof QueryUnitAttemptScheduleEvent) {
+        QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+        if (context.isLeafQuery()) {
+          scheduledRequests.addLeafTask(castEvent);
+        } else {
+          scheduledRequests.addNonLeafTask(castEvent);
+        }
+      }
+    } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
+      // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler.
+      // This event is triggered by QueryUnitAttempt.
+      QueryUnitAttemptScheduleEvent castedEvent = (QueryUnitAttemptScheduleEvent) event;
+      scheduledRequests.leafTasks.remove(castedEvent.getQueryUnitAttempt().getId());
+      LOG.info(castedEvent.getQueryUnitAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+      ((QueryUnitAttemptScheduleEvent) event).getQueryUnitAttempt().handle(
+          new TaskAttemptEvent(castedEvent.getQueryUnitAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
+    }
+  }
+
+  @Override
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+
+    taskRequests.handle(event);
+    int hosts = scheduledRequests.leafTaskHostMapping.size();
+
+    // if available cluster resource are large then tasks, the scheduler thread are working immediately.
+    if(remainingScheduledObjectNum() > 0 &&
+        (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
+      synchronized (schedulingThread){
+        schedulingThread.notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public int remainingScheduledObjectNum() {
+    return scheduledObjectNum;
+  }
+
+  private class TaskRequests implements EventHandler<TaskRequestEvent> {
+    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+        new LinkedBlockingQueue<TaskRequestEvent>();
+
+    @Override
+    public void handle(TaskRequestEvent event) {
+      if(LOG.isDebugEnabled()){
+        LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+      }
+
+      if(stopEventHandling.get()) {
+        event.getCallback().run(stopTaskRunnerReq);
+        return;
+      }
+      int qSize = taskRequestQueue.size();
+      if (qSize != 0 && qSize % 1000 == 0) {
+        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+      }
+      int remCapacity = taskRequestQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue "
+            + "of YarnRMContainerAllocator: " + remCapacity);
+      }
+
+      taskRequestQueue.add(event);
+    }
+
+    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+                                int num) {
+      taskRequestQueue.drainTo(taskRequests, num);
+    }
+
+    public int size() {
+      return taskRequestQueue.size();
+    }
+  }
+
+  /**
+   * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
+   * describes various information for one worker, including :
+   * <ul>
+   *  <li>host name</li>
+   *  <li>rack name</li>
+   *  <li>unassigned tasks for each disk volume</li>
+   *  <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
+   *  <li>the number of running tasks for each volume</li>
+   * </ul>, each task runner and the concurrency number of running tasks for volumes.
+   *
+   * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
+   * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
+   * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
+   *
+   * <h3>Volume id</h3>
+   * Volume id is an integer. Each volume id identifies each disk volume.
+   *
+   * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}.   *
+   * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
+   * In this case, the volume id will be -1 or other native integer.
+   *
+   * <h3>See Also</h3>
+   * <ul>
+   *   <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
+   * </ul>
+   */
+  public class HostVolumeMapping {
+    private final String host;
+    private final String rack;
+    /** A key is disk volume, and a value is a list of tasks to be scheduled. */
+    private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume =
+        Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>());
+    /** A value is last assigned volume id for each task runner */
+    private HashMap<ContainerId, Integer> lastAssignedVolumeId = new HashMap<ContainerId, Integer>();
+    /**
+     * A key is disk volume id, and a value is the load of this volume.
+     * This load is measured by counting how many number of tasks are running.
+     *
+     * These disk volumes are kept in an order of ascending order of the volume id.
+     * In other words, the head volume ids are likely to -1, meaning no given volume id.
+     */
+    private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
+    /** The total number of remain tasks in this host */
+    private AtomicInteger remainTasksNum = new AtomicInteger(0);
+    public static final int REMOTE = -2;
+
+
+    public HostVolumeMapping(String host, String rack){
+      this.host = host;
+      this.rack = rack;
+    }
+
+    public synchronized void addQueryUnitAttempt(int volumeId, QueryUnitAttempt attemptId){
+      synchronized (unassignedTaskForEachVolume){
+        LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+        if (list == null) {
+          list = new LinkedHashSet<QueryUnitAttempt>();
+          unassignedTaskForEachVolume.put(volumeId, list);
+        }
+        list.add(attemptId);
+      }
+
+      remainTasksNum.incrementAndGet();
+
+      if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
+    }
+
+    /**
+     *  Priorities
+     *  1. a task list in a volume of host
+     *  2. unknown block or Non-splittable task in host
+     *  3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
+     */
+    public synchronized QueryUnitAttemptId getLocalTask(ContainerId containerId) {
+      int volumeId;
+      QueryUnitAttemptId queryUnitAttemptId = null;
+
+      if (!lastAssignedVolumeId.containsKey(containerId)) {
+        volumeId = getLowestVolumeId();
+        increaseConcurrency(containerId, volumeId);
+      } else {
+        volumeId = lastAssignedVolumeId.get(containerId);
+      }
+
+      if (unassignedTaskForEachVolume.size() >  0) {
+        int retry = unassignedTaskForEachVolume.size();
+        do {
+          //clean and get a remaining local task
+          queryUnitAttemptId = getAndRemove(volumeId);
+          if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
+            decreaseConcurrency(containerId);
+            if (volumeId > REMOTE) {
+              diskVolumeLoads.remove(volumeId);
+            }
+          }
+
+          if (queryUnitAttemptId == null) {
+            //reassign next volume
+            volumeId = getLowestVolumeId();
+            increaseConcurrency(containerId, volumeId);
+            retry--;
+          } else {
+            break;
+          }
+        } while (retry > 0);
+      } else {
+        this.remainTasksNum.set(0);
+      }
+      return queryUnitAttemptId;
+    }
+
+    public synchronized QueryUnitAttemptId getQueryUnitAttemptIdByRack(String rack) {
+      QueryUnitAttemptId queryUnitAttemptId = null;
+
+      if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
+        int retry = unassignedTaskForEachVolume.size();
+        do {
+          //clean and get a remaining task
+          int volumeId = getLowestVolumeId();
+          queryUnitAttemptId = getAndRemove(volumeId);
+          if (queryUnitAttemptId == null) {
+            if (volumeId > REMOTE) {
+              diskVolumeLoads.remove(volumeId);
+            }
+            retry--;
+          } else {
+            break;
+          }
+        } while (retry > 0);
+      }
+      return queryUnitAttemptId;
+    }
+
+    private synchronized QueryUnitAttemptId getAndRemove(int volumeId){
+      QueryUnitAttemptId queryUnitAttemptId = null;
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return queryUnitAttemptId;
+
+      LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+      if(list != null && list.size() > 0){
+        QueryUnitAttempt queryUnitAttempt;
+        synchronized (unassignedTaskForEachVolume) {
+          Iterator<QueryUnitAttempt> iterator = list.iterator();
+          queryUnitAttempt = iterator.next();
+          iterator.remove();
+        }
+
+        this.remainTasksNum.getAndDecrement();
+        queryUnitAttemptId = queryUnitAttempt.getId();
+        for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) {
+          if (!this.getHost().equals(location.getHost())) {
+            HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
+            volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+          }
+        }
+      }
+
+      if(list == null || list.isEmpty()) {
+        unassignedTaskForEachVolume.remove(volumeId);
+      }
+      return queryUnitAttemptId;
+    }
+
+    private synchronized void removeQueryUnitAttempt(int volumeId, QueryUnitAttempt queryUnitAttempt){
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
+
+      LinkedHashSet<QueryUnitAttempt> tasks  = unassignedTaskForEachVolume.get(volumeId);
+
+      if(tasks != null && tasks.size() > 0){
+        tasks.remove(queryUnitAttempt);
+        remainTasksNum.getAndDecrement();
+      } else {
+        unassignedTaskForEachVolume.remove(volumeId);
+      }
+    }
+
+    /**
+     * Increase the count of running tasks and disk loads for a certain task runner.
+     *
+     * @param containerId The task runner identifier
+     * @param volumeId Volume identifier
+     * @return the volume load (i.e., how many running tasks use this volume)
+     */
+    private synchronized int increaseConcurrency(ContainerId containerId, int volumeId) {
+
+      int concurrency = 1;
+      if (diskVolumeLoads.containsKey(volumeId)) {
+        concurrency = diskVolumeLoads.get(volumeId) + 1;
+      }
+
+      if (volumeId > -1) {
+        LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
+      } else if (volumeId == -1) {
+        // this case is disabled namenode block meta or compressed text file or amazon s3
+        LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
+      } else if (volumeId == REMOTE) {
+        // this case has processed all block on host and it will be assigned to remote
+        LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
+            + ", Remote Concurrency : " + concurrency);
+      }
+      diskVolumeLoads.put(volumeId, concurrency);
+      lastAssignedVolumeId.put(containerId, volumeId);
+      return concurrency;
+    }
+
+    /**
+     * Decrease the count of running tasks of a certain task runner
+     */
+    private synchronized void decreaseConcurrency(ContainerId containerId){
+      Integer volumeId = lastAssignedVolumeId.get(containerId);
+      if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
+        Integer concurrency = diskVolumeLoads.get(volumeId);
+        if(concurrency > 0){
+          diskVolumeLoads.put(volumeId, concurrency - 1);
+        } else {
+          if (volumeId > REMOTE) {
+            diskVolumeLoads.remove(volumeId);
+          }
+        }
+      }
+      lastAssignedVolumeId.remove(containerId);
+    }
+
+    /**
+     *  volume of a host : 0 ~ n
+     *  compressed task, amazon s3, unKnown volume : -1
+     *  remote task : -2
+     */
+    public int getLowestVolumeId(){
+      Map.Entry<Integer, Integer> volumeEntry = null;
+
+      for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
+        if(volumeEntry == null) volumeEntry = entry;
+
+        if (volumeEntry.getValue() >= entry.getValue()) {
+          volumeEntry = entry;
+        }
+      }
+
+      if(volumeEntry != null){
+        return volumeEntry.getKey();
+      } else {
+        return REMOTE;
+      }
+    }
+
+    public boolean isAssigned(ContainerId containerId){
+      return lastAssignedVolumeId.containsKey(containerId);
+    }
+
+    public boolean isRemote(ContainerId containerId){
+      Integer volumeId = lastAssignedVolumeId.get(containerId);
+      if(volumeId == null || volumeId > REMOTE){
+        return false;
+      } else {
+        return true;
+      }
+    }
+
+    public int getRemoteConcurrency(){
+      return getVolumeConcurrency(REMOTE);
+    }
+
+    public int getVolumeConcurrency(int volumeId){
+      Integer size = diskVolumeLoads.get(volumeId);
+      if(size == null) return 0;
+      else return size;
+    }
+
+    public int getRemainingLocalTaskSize(){
+      return remainTasksNum.get();
+    }
+
+    public String getHost() {
+
+      return host;
+    }
+
+    public String getRack() {
+      return rack;
+    }
+  }
+
+  private class ScheduledRequests {
+    // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
+    // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
+    // if the task is not included in leafTasks and nonLeafTasks.
+    private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+    private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+    private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>();
+    private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping =
+        new HashMap<String, HashSet<QueryUnitAttemptId>>();
+
+    private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
+      QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
+      List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
+
+      for (DataLocation location : locations) {
+        String host = location.getHost();
+
+        HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+        if (hostVolumeMapping == null) {
+          String rack = RackResolver.resolve(host).getNetworkLocation();
+          hostVolumeMapping = new HostVolumeMapping(host, rack);
+          leafTaskHostMapping.put(host, hostVolumeMapping);
+        }
+        hostVolumeMapping.addQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Added attempt req to host " + host);
+        }
+
+        HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
+        if (list == null) {
+          list = new HashSet<QueryUnitAttemptId>();
+          leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
+        }
+
+        list.add(queryUnitAttempt.getId());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
+        }
+      }
+
+      leafTasks.add(queryUnitAttempt.getId());
+    }
+
+    private void addNonLeafTask(QueryUnitAttemptScheduleEvent event) {
+      nonLeafTasks.add(event.getQueryUnitAttempt().getId());
+    }
+
+    public int leafTaskNum() {
+      return leafTasks.size();
+    }
+
+    public int nonLeafTaskNum() {
+      return nonLeafTasks.size();
+    }
+
+    public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
+
+    private QueryUnitAttemptId allocateLocalTask(String host, ContainerId containerId){
+      HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+      if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
+        while (hostVolumeMapping.getRemainingLocalTaskSize() > 0) {
+          QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+          //find remaining local task
+          if (leafTasks.contains(attemptId)) {
+            leafTasks.remove(attemptId);
+            //LOG.info(attemptId + " Assigned based on host match " + hostName);
+            hostLocalAssigned++;
+            totalAssigned++;
+            return attemptId;
+          }
+        }
+      }
+      return null;
+    }
+
+    private QueryUnitAttemptId allocateRackTask(String host) {
+
+      List<HostVolumeMapping> remainingTasks = new ArrayList<HostVolumeMapping>(leafTaskHostMapping.values());
+      String rack = RackResolver.resolve(host).getNetworkLocation();
+      QueryUnitAttemptId attemptId = null;
+
+      if (remainingTasks.size() > 0) {
+        //find largest remaining task of other host in rack
+        Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+          @Override
+          public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+            // descending remaining tasks
+            return Integer.valueOf(v2.remainTasksNum.get()).compareTo(Integer.valueOf(v1.remainTasksNum.get()));
+          }
+        });
+
+        for (HostVolumeMapping tasks : remainingTasks) {
+          while (tasks.getRemainingLocalTaskSize() > 0){
+            QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack);
+
+            if (tId == null) break;
+
+            if (leafTasks.contains(tId)) {
+              leafTasks.remove(tId);
+              attemptId = tId;
+              break;
+            }
+          }
+          if(attemptId != null) break;
+        }
+      }
+
+      //find task in rack
+      if (attemptId == null) {
+        HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+        if (list != null) {
+          synchronized (list) {
+            Iterator<QueryUnitAttemptId> iterator = list.iterator();
+            while (iterator.hasNext()) {
+              QueryUnitAttemptId tId = iterator.next();
+              iterator.remove();
+              if (leafTasks.contains(tId)) {
+                leafTasks.remove(tId);
+                attemptId = tId;
+                break;
+              }
+            }
+          }
+        }
+      }
+
+      if (attemptId != null) {
+        rackLocalAssigned++;
+        totalAssigned++;
+
+        LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
+            hostLocalAssigned, rackLocalAssigned, totalAssigned,
+            ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
+
+      }
+      return attemptId;
+    }
+
+    public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+      Collections.shuffle(taskRequests);
+      LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
+
+      TaskRequestEvent taskRequest;
+      while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
+        taskRequest = taskRequests.pollFirst();
+        if(taskRequest == null) { // if there are only remote task requests
+          taskRequest = remoteTaskRequests.pollFirst();
+        }
+
+        // checking if this container is still alive.
+        // If not, ignore the task request and stop the task runner
+        ContainerProxy container = context.getMasterContext().getResourceAllocator()
+            .getContainer(taskRequest.getContainerId());
+        if(container == null) {
+          taskRequest.getCallback().run(stopTaskRunnerReq);
+          continue;
+        }
+
+        // getting the hostname of requested node
+        String host = container.getTaskHostName();
+
+        // if there are no worker matched to the hostname a task request
+        if(!leafTaskHostMapping.containsKey(host)){
+          host = NetUtils.normalizeHost(host);
+
+          if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){
+            // this case means one of either cases:
+            // * there are no blocks which reside in this node.
+            // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
+            // In this case, we transfer the task request to the remote task request list, and skip the followings.
+            remoteTaskRequests.add(taskRequest);
+            continue;
+          }
+        }
+
+        ContainerId containerId = taskRequest.getContainerId();
+        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+            "containerId=" + containerId);
+
+        //////////////////////////////////////////////////////////////////////
+        // disk or host-local allocation
+        //////////////////////////////////////////////////////////////////////
+        QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);
+
+        if (attemptId == null) { // if a local task cannot be found
+          HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+          if(hostVolumeMapping != null) {
+            if(!hostVolumeMapping.isRemote(containerId)){
+              // assign to remote volume
+              hostVolumeMapping.decreaseConcurrency(containerId);
+              hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
+            }
+            // this part is remote concurrency management of a tail tasks
+            int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
+
+            if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
+              //release container
+              hostVolumeMapping.decreaseConcurrency(containerId);
+              taskRequest.getCallback().run(stopTaskRunnerReq);
+              subQuery.releaseContainer(containerId);
+              continue;
+            }
+          }
+
+          //////////////////////////////////////////////////////////////////////
+          // rack-local allocation
+          //////////////////////////////////////////////////////////////////////
+          attemptId = allocateRackTask(host);
+
+          //////////////////////////////////////////////////////////////////////
+          // random node allocation
+          //////////////////////////////////////////////////////////////////////
+          if (attemptId == null && leafTaskNum() > 0) {
+            synchronized (leafTasks){
+              attemptId = leafTasks.iterator().next();
+              leafTasks.remove(attemptId);
+              rackLocalAssigned++;
+              totalAssigned++;
+              LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
+                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
+                  ((double) hostLocalAssigned / (double) totalAssigned) * 100));
+            }
+          }
+        }
+
+        if (attemptId != null) {
+          QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+              attemptId,
+              new ArrayList<FragmentProto>(task.getAllFragments()),
+              "",
+              false,
+              task.getLogicalPlan().toJson(),
+              context.getMasterContext().getQueryContext(),
+              subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
+          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+            taskAssign.setInterQuery();
+          }
+
+          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+              taskRequest.getContainerId(),
+              host, container.getTaskPort()));
+          assignedRequest.add(attemptId);
+
+          scheduledObjectNum -= task.getAllFragments().size();
+          taskRequest.getCallback().run(taskAssign.getProto());
+        } else {
+          throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+        }
+      }
+    }
+
+    private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+      if (masterPlan.isRoot(block)) {
+        return false;
+      }
+
+      ExecutionBlock parent = masterPlan.getParent(block);
+      if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+        return false;
+      }
+
+      return true;
+    }
+
+    public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+      Collections.shuffle(taskRequests);
+
+      TaskRequestEvent taskRequest;
+      while (!taskRequests.isEmpty()) {
+        taskRequest = taskRequests.pollFirst();
+        LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+        QueryUnitAttemptId attemptId;
+        // random allocation
+        if (nonLeafTasks.size() > 0) {
+          synchronized (nonLeafTasks){
+            attemptId = nonLeafTasks.iterator().next();
+            nonLeafTasks.remove(attemptId);
+          }
+          LOG.debug("Assigned based on * match");
+
+          QueryUnit task;
+          task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+              attemptId,
+              Lists.newArrayList(task.getAllFragments()),
+              "",
+              false,
+              task.getLogicalPlan().toJson(),
+              context.getMasterContext().getQueryContext(),
+              subQuery.getDataChannel(),
+              subQuery.getBlock().getEnforcer());
+          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+            taskAssign.setInterQuery();
+          }
+          for (ScanNode scan : task.getScanNodes()) {
+            Collection<URI> fetches = task.getFetch(scan);
+            if (fetches != null) {
+              for (URI fetch : fetches) {
+                taskAssign.addFetch(scan.getTableName(), fetch);
+              }
+            }
+          }
+
+          ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
+              taskRequest.getContainerId());
+          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+              taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
+          taskRequest.getCallback().run(taskAssign.getProto());
+          totalAssigned++;
+          scheduledObjectNum--;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
new file mode 100644
index 0000000..561f980
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public class FetchScheduleEvent extends TaskSchedulerEvent {
+  private final Map<String, List<URI>> fetches;
+
+  public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
+                            final Map<String, List<URI>> fetches) {
+    super(eventType, blockId);
+    this.fetches = fetches;
+  }
+
+  public Map<String, List<URI>> getFetches() {
+    return fetches;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
new file mode 100644
index 0000000..598b1c5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+/**
+ * FragmentPair consists of two fragments, a left fragment and a right fragment.
+ * According to queries, it can have the different values.
+ * For join queries, it is assumed to have both fragments.
+ * Also, the left fragment is assumed to be a fragment of the larger table.
+ * For other queries, it is assumed to have only a left fragment.
+ */
+public class FragmentPair {
+  private FileFragment leftFragment;
+  private FileFragment rightFragment;
+
+  public FragmentPair(FileFragment left) {
+    this.leftFragment = left;
+  }
+
+  public FragmentPair(FileFragment left, FileFragment right) {
+    this.leftFragment = left;
+    this.rightFragment = right;
+  }
+
+  public FileFragment getLeftFragment() {
+    return leftFragment;
+  }
+
+  public FileFragment getRightFragment() {
+    return rightFragment;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof FragmentPair) {
+      FragmentPair other = (FragmentPair) o;
+      boolean eq = this.leftFragment.equals(other.leftFragment);
+      if (this.rightFragment != null && other.rightFragment != null) {
+        eq &= this.rightFragment.equals(other.rightFragment);
+      } else if (this.rightFragment == null && other.rightFragment == null) {
+        eq &= true;
+      } else {
+        return false;
+      }
+      return eq;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(leftFragment, rightFragment);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..10d993d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+/**
+ * FragmentScheduleAlgorithm is used by LazyTaskScheduler.
+ * FragmentScheduleAlgorithm selects a fragment for the given argument.
+ *
+ * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm.
+ */
+public interface FragmentScheduleAlgorithm {
+  void addFragment(FragmentPair fragmentPair);
+  void removeFragment(FragmentPair fragmentPair);
+
+  FragmentPair getHostLocalFragment(String host);
+  FragmentPair getHostLocalFragment(String host, Integer diskId);
+  FragmentPair getRackLocalFragment(String host);
+  FragmentPair getRandomFragment();
+  FragmentPair[] getAllFragments();
+
+  int size();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
new file mode 100644
index 0000000..820a0fb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class FragmentScheduleAlgorithmFactory {
+
+  private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS;
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+  private static final Class<?>[] DEFAULT_PARAMS = {};
+
+  public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf)
+      throws IOException {
+    if (CACHED_ALGORITHM_CLASS != null) {
+      return CACHED_ALGORITHM_CLASS;
+    } else {
+      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null,
+          FragmentScheduleAlgorithm.class);
+    }
+
+    if (CACHED_ALGORITHM_CLASS == null) {
+      throw new IOException("Scheduler algorithm is null");
+    }
+    return CACHED_ALGORITHM_CLASS;
+  }
+
+  public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) {
+    T result;
+    try {
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+      result = constructor.newInstance(new Object[]{});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException {
+    return get(getScheduleAlgorithmClass(conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
new file mode 100644
index 0000000..8acf2b2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.algebra.AlterTablespaceSetType;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.exception.IllegalQueryStatusException;
+import org.apache.tajo.engine.exception.VerifyException;
+import org.apache.tajo.engine.parser.HiveQLAnalyzer;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
+import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
+
+public class GlobalEngine extends AbstractService {
+  /** Class Logger */
+  private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
+
+  private final MasterContext context;
+  private final AbstractStorageManager sm;
+
+  private SQLAnalyzer analyzer;
+  private HiveQLAnalyzer converter;
+  private CatalogService catalog;
+  private PreLogicalPlanVerifier preVerifier;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private LogicalPlanVerifier annotatedPlanVerifier;
+  private DistributedQueryHookManager hookManager;
+
+  public GlobalEngine(final MasterContext context) {
+    super(GlobalEngine.class.getName());
+    this.context = context;
+    this.catalog = context.getCatalog();
+    this.sm = context.getStorageManager();
+  }
+
+  public void start() {
+    try  {
+      analyzer = new SQLAnalyzer();
+      converter = new HiveQLAnalyzer();
+      preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
+      planner = new LogicalPlanner(context.getCatalog());
+      optimizer = new LogicalOptimizer(context.getConf());
+      annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
+
+      hookManager = new DistributedQueryHookManager();
+      hookManager.addHook(new CreateTableHook());
+      hookManager.addHook(new InsertHook());
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+    }
+    super.start();
+  }
+
+  public void stop() {
+    super.stop();
+  }
+
+  public SubmitQueryResponse executeQuery(Session session, String sql)
+      throws InterruptedException, IOException, IllegalQueryStatusException {
+
+    LOG.info("SQL: " + sql);
+    QueryContext queryContext = new QueryContext();
+
+    try {
+      // setting environment variables
+      String [] cmds = sql.split(" ");
+      if(cmds != null) {
+          if(cmds[0].equalsIgnoreCase("set")) {
+            String[] params = cmds[1].split("=");
+            context.getConf().set(params[0], params[1]);
+            SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
+            responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+            responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+            responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+            return responseBuilder.build();
+          }
+      }
+
+      final boolean hiveQueryMode = context.getConf().getBoolVar(TajoConf.ConfVars.HIVE_QUERY_MODE);
+      LOG.info("hive.query.mode:" + hiveQueryMode);
+
+      if (hiveQueryMode) {
+        context.getSystemMetrics().counter("Query", "numHiveMode").inc();
+        queryContext.setHiveQueryMode();
+      }
+
+      context.getSystemMetrics().counter("Query", "totalQuery").inc();
+
+      Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
+      LogicalPlan plan = createLogicalPlan(session, planningContext);
+      SubmitQueryResponse response = executeQueryInternal(queryContext, session, plan, sql);
+      return response;
+    } catch (Throwable t) {
+      context.getSystemMetrics().counter("Query", "errorQuery").inc();
+      LOG.error("\nStack Trace:\n" + StringUtils.stringifyException(t));
+      SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
+      responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setIsForwarded(true);
+      responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+      String errorMessage = t.getMessage();
+      if (t.getMessage() == null) {
+        errorMessage = StringUtils.stringifyException(t);
+      }
+      responseBuilder.setErrorMessage(errorMessage);
+      return responseBuilder.build();
+    }
+  }
+
+  private SubmitQueryResponse executeQueryInternal(QueryContext queryContext,
+                                                      Session session,
+                                                      LogicalPlan plan,
+                                                      String sql) throws Exception {
+
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+    SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
+    responseBuilder.setIsForwarded(false);
+    responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+
+    if (PlannerUtil.checkIfDDLPlan(rootNode)) {
+      context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
+      updateQuery(session, rootNode.getChild());
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+
+    } else if (plan.isExplain()) { // explain query
+      String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
+      Schema schema = new Schema();
+      schema.addColumn("explain", TajoDataTypes.Type.TEXT);
+      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+
+      SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
+
+      VTuple tuple = new VTuple(1);
+      String[] lines = explainStr.split("\n");
+      int bytesNum = 0;
+      for (String line : lines) {
+        tuple.put(0, DatumFactory.createText(line));
+        byte [] encodedData = encoder.toBytes(tuple);
+        bytesNum += encodedData.length;
+        serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData));
+      }
+      serializedResBuilder.setSchema(schema.getProto());
+      serializedResBuilder.setBytesNum(bytesNum);
+
+      responseBuilder.setResultSet(serializedResBuilder.build());
+      responseBuilder.setMaxRowNum(lines.length);
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+
+      // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
+    } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
+      ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+      TableDesc desc = scanNode.getTableDesc();
+      if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
+        LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
+        responseBuilder.setMaxRowNum((int) limitNode.getFetchFirstNum());
+      } else {
+        if (desc.getStats().getNumBytes() > 0 && desc.getStats().getNumRows() == 0) {
+          responseBuilder.setMaxRowNum(Integer.MAX_VALUE);
+        }
+      }
+      responseBuilder.setTableDesc(desc.getProto());
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+
+      // NonFromQuery indicates a form of 'select a, x+y;'
+    } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
+      Target [] targets = plan.getRootBlock().getRawTargets();
+      if (targets == null) {
+        throw new PlanningException("No targets");
+      }
+      Tuple outTuple = new VTuple(targets.length);
+      for (int i = 0; i < targets.length; i++) {
+        EvalNode eval = targets[i].getEvalTree();
+        outTuple.put(i, eval.eval(null, null));
+      }
+
+      Schema schema = PlannerUtil.targetToSchema(targets);
+      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+      byte [] serializedBytes = encoder.toBytes(outTuple);
+      SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
+      serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+      serializedResBuilder.setSchema(schema.getProto());
+      serializedResBuilder.setBytesNum(serializedBytes.length);
+
+      responseBuilder.setResultSet(serializedResBuilder);
+      responseBuilder.setMaxRowNum(1);
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+
+    } else { // it requires distributed execution. So, the query is forwarded to a query master.
+      context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
+      hookManager.doHooks(queryContext, plan);
+
+      QueryJobManager queryJobManager = this.context.getQueryJobManager();
+      QueryInfo queryInfo;
+
+      queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, rootNode);
+
+      if(queryInfo == null) {
+        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+        responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+        responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+      } else {
+        responseBuilder.setIsForwarded(true);
+        responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+        responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+        if(queryInfo.getQueryMasterHost() != null) {
+          responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+        }
+        responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+        LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+      }
+    }
+    SubmitQueryResponse response = responseBuilder.build();
+    return response;
+  }
+
+  public QueryId updateQuery(Session session, String sql) throws IOException, SQLException, PlanningException {
+    try {
+      LOG.info("SQL: " + sql);
+      // parse the query
+      Expr expr = analyzer.parse(sql);
+
+      LogicalPlan plan = createLogicalPlan(session, expr);
+      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+      if (!PlannerUtil.checkIfDDLPlan(rootNode)) {
+        throw new SQLException("This is not update query:\n" + sql);
+      } else {
+        updateQuery(session, rootNode.getChild());
+        return QueryIdFactory.NULL_QUERY_ID;
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  private boolean updateQuery(Session session, LogicalNode root) throws IOException {
+
+    switch (root.getType()) {
+      case CREATE_DATABASE:
+        CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+        createDatabase(session, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+        return true;
+      case DROP_DATABASE:
+        DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+        dropDatabase(session, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+        return true;
+      case CREATE_TABLE:
+        CreateTableNode createTable = (CreateTableNode) root;
+        createTable(session, createTable, createTable.isIfNotExists());
+        return true;
+      case DROP_TABLE:
+        DropTableNode dropTable = (DropTableNode) root;
+        dropTable(session, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+        return true;
+      case ALTER_TABLESPACE:
+        AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+        alterTablespace(session, alterTablespace);
+        return true;
+      case ALTER_TABLE:
+        AlterTableNode alterTable = (AlterTableNode) root;
+        alterTable(session,alterTable);
+        return true;
+      default:
+        throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
+    }
+  }
+
+  private LogicalPlan createLogicalPlan(Session session, Expr expression) throws PlanningException {
+
+    VerificationState state = new VerificationState();
+    preVerifier.verify(session, state, expression);
+    if (!state.verified()) {
+      StringBuilder sb = new StringBuilder();
+      for (String error : state.getErrorMessages()) {
+        sb.append(error).append("\n");
+      }
+      throw new VerifyException(sb.toString());
+    }
+
+    LogicalPlan plan = planner.createPlan(session, expression);
+    LOG.info("=============================================");
+    LOG.info("Non Optimized Query: \n" + plan.toString());
+    LOG.info("=============================================");
+    optimizer.optimize(plan);
+    LOG.info("=============================================");
+    LOG.info("Optimized Query: \n" + plan.toString());
+    LOG.info("=============================================");
+
+    annotatedPlanVerifier.verify(session, state, plan);
+
+    if (!state.verified()) {
+      StringBuilder sb = new StringBuilder();
+      for (String error : state.getErrorMessages()) {
+        sb.append(error).append("\n");
+      }
+      throw new VerifyException(sb.toString());
+    }
+
+    return plan;
+  }
+
+  /**
+   * Alter a given table
+   */
+  public void alterTablespace(final Session session, final AlterTablespaceNode alterTablespace) {
+
+    final CatalogService catalog = context.getCatalog();
+    final String spaceName = alterTablespace.getTablespaceName();
+
+    AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
+    builder.setSpaceName(spaceName);
+    if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
+      AlterTablespaceCommand.Builder commandBuilder = AlterTablespaceCommand.newBuilder();
+      commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
+      commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
+      commandBuilder.build();
+      builder.addCommand(commandBuilder);
+    } else {
+      throw new RuntimeException("This 'ALTER TABLESPACE' is not supported yet.");
+    }
+
+    catalog.alterTablespace(builder.build());
+  }
+
+  /**
+   * Alter a given table
+   */
+  public void alterTable(final Session session, final AlterTableNode alterTable) throws IOException {
+
+    final CatalogService catalog = context.getCatalog();
+    final String tableName = alterTable.getTableName();
+
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String[] split = CatalogUtil.splitFQTableName(tableName);
+      databaseName = split[0];
+      simpleTableName = split[1];
+    } else {
+      databaseName = session.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+    final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+    if (!catalog.existsTable(databaseName, simpleTableName)) {
+      throw new NoSuchTableException(qualifiedName);
+    }
+
+    switch (alterTable.getAlterTableOpType()) {
+      case RENAME_TABLE:
+        if (!catalog.existsTable(databaseName, simpleTableName)) {
+          throw new NoSuchTableException(alterTable.getTableName());
+        }
+        if (catalog.existsTable(databaseName, alterTable.getNewTableName())) {
+          throw new AlreadyExistsTableException(alterTable.getNewTableName());
+        }
+
+        TableDesc desc = catalog.getTableDesc(databaseName, simpleTableName);
+
+        if (!desc.isExternal()) { // if the table is the managed table
+          Path oldPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+              databaseName, simpleTableName);
+          Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+              databaseName, alterTable.getNewTableName());
+          FileSystem fs = oldPath.getFileSystem(context.getConf());
+
+          if (!fs.exists(oldPath)) {
+            throw new IOException("No such a table directory: " + oldPath);
+          }
+          if (fs.exists(newPath)) {
+            throw new IOException("Already table directory exists: " + newPath);
+          }
+
+          fs.rename(oldPath, newPath);
+        }
+        catalog.alterTable(CatalogUtil.renameTable(qualifiedName, alterTable.getNewTableName(),
+            AlterTableType.RENAME_TABLE));
+        break;
+      case RENAME_COLUMN:
+        if (existColumnName(qualifiedName, alterTable.getNewColumnName())) {
+          throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName());
+        }
+        catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(), alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
+        break;
+      case ADD_COLUMN:
+        if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) {
+          throw new ColumnNameAlreadyExistException(alterTable.getAddNewColumn().getSimpleName());
+        }
+        catalog.alterTable(CatalogUtil.addNewColumn(qualifiedName, alterTable.getAddNewColumn(), AlterTableType.ADD_COLUMN));
+        break;
+      default:
+        //TODO
+    }
+  }
+
+  private boolean existColumnName(String tableName, String columnName) {
+    final TableDesc tableDesc = catalog.getTableDesc(tableName);
+    return tableDesc.getSchema().containsByName(columnName) ? true : false;
+  }
+
+  private TableDesc createTable(Session session, CreateTableNode createTable, boolean ifNotExists) throws IOException {
+    TableMeta meta;
+
+    if (createTable.hasOptions()) {
+      meta = CatalogUtil.newTableMeta(createTable.getStorageType(), createTable.getOptions());
+    } else {
+      meta = CatalogUtil.newTableMeta(createTable.getStorageType());
+    }
+
+    if(createTable.isExternal()){
+      Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
+    } else {
+      String databaseName;
+      String tableName;
+      if (CatalogUtil.isFQTableName(createTable.getTableName())) {
+        databaseName = CatalogUtil.extractQualifier(createTable.getTableName());
+        tableName = CatalogUtil.extractSimpleName(createTable.getTableName());
+      } else {
+        databaseName = session.getCurrentDatabase();
+        tableName = createTable.getTableName();
+      }
+
+      // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
+      Path tablePath = StorageUtil.concatPath(sm.getWarehouseDir(), databaseName, tableName);
+      createTable.setPath(tablePath);
+    }
+
+    return createTableOnPath(session, createTable.getTableName(), createTable.getTableSchema(),
+        meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists);
+  }
+
+  public TableDesc createTableOnPath(Session session, String tableName, Schema schema, TableMeta meta,
+                                     Path path, boolean isExternal, PartitionMethodDesc partitionDesc,
+                                     boolean ifNotExists)
+      throws IOException {
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableName);
+      databaseName = splitted[0];
+      simpleTableName = splitted[1];
+    } else {
+      databaseName = session.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+    boolean exists = catalog.existsTable(databaseName, simpleTableName);
+
+    if (exists) {
+      if (ifNotExists) {
+        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+        return catalog.getTableDesc(databaseName, simpleTableName);
+      } else {
+        throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
+      }
+    }
+
+    FileSystem fs = path.getFileSystem(context.getConf());
+
+    if (isExternal) {
+      if(!fs.exists(path)) {
+        LOG.error("ERROR: " + path.toUri() + " does not exist");
+        throw new IOException("ERROR: " + path.toUri() + " does not exist");
+      }
+    } else {
+      fs.mkdirs(path);
+    }
+
+    long totalSize = 0;
+
+    try {
+      totalSize = sm.calculateSize(path);
+    } catch (IOException e) {
+      LOG.warn("Cannot calculate the size of the relation", e);
+    }
+
+    TableStats stats = new TableStats();
+    stats.setNumBytes(totalSize);
+    TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
+        schema, meta, path, isExternal);
+    desc.setStats(stats);
+    if (partitionDesc != null) {
+      desc.setPartitionMethod(partitionDesc);
+    }
+
+    if (catalog.createTable(desc)) {
+      LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
+      return desc;
+    } else {
+      LOG.info("Table creation " + tableName + " is failed.");
+      throw new CatalogException("Cannot create table \"" + tableName + "\".");
+    }
+  }
+
+  public boolean createDatabase(@Nullable Session session, String databaseName,
+                                @Nullable String tablespace,
+                                boolean ifNotExists) throws IOException {
+
+    String tablespaceName;
+    if (tablespace == null) {
+      tablespaceName = DEFAULT_TABLESPACE_NAME;
+    } else {
+      tablespaceName = tablespace;
+    }
+
+    // CREATE DATABASE IF NOT EXISTS
+    boolean exists = catalog.existDatabase(databaseName);
+    if (exists) {
+      if (ifNotExists) {
+        LOG.info("database \"" + databaseName + "\" is already exists." );
+        return true;
+      } else {
+        throw new AlreadyExistsDatabaseException(databaseName);
+      }
+    }
+
+    if (catalog.createDatabase(databaseName, tablespaceName)) {
+      String normalized = databaseName;
+      Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized);
+      FileSystem fs = databaseDir.getFileSystem(context.getConf());
+      fs.mkdirs(databaseDir);
+    }
+
+    return true;
+  }
+
+  public boolean dropDatabase(Session session, String databaseName, boolean ifExists) {
+
+    boolean exists = catalog.existDatabase(databaseName);
+    if(!exists) {
+      if (ifExists) { // DROP DATABASE IF EXISTS
+        LOG.info("database \"" + databaseName + "\" does not exists." );
+        return true;
+      } else { // Otherwise, it causes an exception.
+        throw new NoSuchDatabaseException(databaseName);
+      }
+    }
+
+    if (session.getCurrentDatabase().equals(databaseName)) {
+      throw new RuntimeException("ERROR: Cannot drop the current open database");
+    }
+
+    boolean result = catalog.dropDatabase(databaseName);
+    LOG.info("database " + databaseName + " is dropped.");
+    return result;
+  }
+
+  /**
+   * Drop a given named table
+   *
+   * @param tableName to be dropped
+   * @param purge Remove all data if purge is true.
+   */
+  public boolean dropTable(Session session, String tableName, boolean ifExists, boolean purge) {
+    CatalogService catalog = context.getCatalog();
+
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableName);
+      databaseName = splitted[0];
+      simpleTableName = splitted[1];
+    } else {
+      databaseName = session.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+    boolean exists = catalog.existsTable(qualifiedName);
+    if(!exists) {
+      if (ifExists) { // DROP TABLE IF EXISTS
+        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+        return true;
+      } else { // Otherwise, it causes an exception.
+        throw new NoSuchTableException(qualifiedName);
+      }
+    }
+
+    Path path = catalog.getTableDesc(qualifiedName).getPath();
+    catalog.dropTable(qualifiedName);
+
+    if (purge) {
+      try {
+        FileSystem fs = path.getFileSystem(context.getConf());
+        fs.delete(path, true);
+      } catch (IOException e) {
+        throw new InternalError(e.getMessage());
+      }
+    }
+
+    LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName));
+    return true;
+  }
+
+  public interface DistributedQueryHook {
+    boolean isEligible(QueryContext queryContext, LogicalPlan plan);
+    void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
+  }
+
+  public static class DistributedQueryHookManager {
+    private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
+    public void addHook(DistributedQueryHook hook) {
+      hooks.add(hook);
+    }
+
+    public void doHooks(QueryContext queryContext, LogicalPlan plan) {
+      for (DistributedQueryHook hook : hooks) {
+        if (hook.isEligible(queryContext, plan)) {
+          try {
+            hook.hook(queryContext, plan);
+          } catch (Throwable t) {
+            t.printStackTrace();
+          }
+        }
+      }
+    }
+  }
+
+  public class CreateTableHook implements DistributedQueryHook {
+
+    @Override
+    public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+      return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
+    }
+
+    @Override
+    public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+      CreateTableNode createTableNode = rootNode.getChild();
+      String [] splitted  = CatalogUtil.splitFQTableName(createTableNode.getTableName());
+      String databaseName = splitted[0];
+      String tableName = splitted[1];
+      queryContext.setOutputTable(tableName);
+      queryContext.setOutputPath(
+          StorageUtil.concatPath(TajoConf.getWarehouseDir(context.getConf()), databaseName, tableName));
+      if(createTableNode.getPartitionMethod() != null) {
+        queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
+      }
+      queryContext.setCreateTable();
+    }
+  }
+
+  public static class InsertHook implements DistributedQueryHook {
+
+    @Override
+    public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+      return plan.getRootBlock().getRootType() == NodeType.INSERT;
+    }
+
+    @Override
+  public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+      queryContext.setInsert();
+
+      InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
+
+      // Set QueryContext settings, such as output table name and output path.
+      // It also remove data files if overwrite is true.
+      Path outputPath;
+      if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
+        queryContext.setOutputTable(insertNode.getTableName());
+        queryContext.setOutputPath(insertNode.getPath());
+      } else { // INSERT INTO LOCATION ...
+        // When INSERT INTO LOCATION, must not set output table.
+        outputPath = insertNode.getPath();
+        queryContext.setFileOutput();
+        queryContext.setOutputPath(outputPath);
+      }
+
+      if (insertNode.isOverwrite()) {
+        queryContext.setOutputOverwrite();
+      }
+    }
+  }
+}