You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:30 UTC

[27/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..82fd6fc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * GreedyFragmentScheduleAlgorithm selects a fragment considering the number of fragments that are not scheduled yet.
+ * Disks of hosts have the priorities which are represented by the remaining number of fragments.
+ * This algorithm selects a fragment with trying minimizing the maximum priority.
+ */
+public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
+  private final static Log LOG = LogFactory.getLog(GreedyFragmentScheduleAlgorithm.class);
+  private final HostPriorityComparator hostComparator = new HostPriorityComparator();
+  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
+      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
+  private Map<HostAndDisk, PrioritizedHost> totalHostPriority = new HashMap<HostAndDisk, PrioritizedHost>();
+  private Map<String, Set<PrioritizedHost>> hostPriorityPerRack = new HashMap<String, Set<PrioritizedHost>>();
+  private TopologyCache topologyCache = new TopologyCache();
+  private int totalFragmentNum = 0;
+
+  private FragmentsPerDisk getHostFragmentSet(String host, Integer diskId) {
+    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap;
+    FragmentsPerDisk fragmentsPerDisk;
+    if (fragmentHostMapping.containsKey(host)) {
+      fragmentsPerDiskMap = fragmentHostMapping.get(host);
+    } else {
+      fragmentsPerDiskMap = new HashMap<Integer, FragmentsPerDisk>();
+      fragmentHostMapping.put(host, fragmentsPerDiskMap);
+    }
+    if (fragmentsPerDiskMap.containsKey(diskId)) {
+      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+    } else {
+      fragmentsPerDisk = new FragmentsPerDisk(diskId);
+      fragmentsPerDiskMap.put(diskId, fragmentsPerDisk);
+    }
+    return fragmentsPerDisk;
+  }
+
+  private void updateHostPriority(HostAndDisk hostAndDisk, int priority) {
+    if (priority > 0) {
+      // update the priority among the total hosts
+      PrioritizedHost prioritizedHost;
+      if (totalHostPriority.containsKey(hostAndDisk)) {
+        prioritizedHost = totalHostPriority.get(hostAndDisk);
+        prioritizedHost.priority = priority;
+      } else {
+        prioritizedHost = new PrioritizedHost(hostAndDisk, priority);
+        totalHostPriority.put(hostAndDisk, prioritizedHost);
+      }
+
+      // update the priority among the hosts in a rack
+      String rack = topologyCache.resolve(hostAndDisk.host);
+      Set<PrioritizedHost> hostsOfRack;
+      if (!hostPriorityPerRack.containsKey(rack)) {
+        hostsOfRack = new HashSet<PrioritizedHost>();
+        hostsOfRack.add(prioritizedHost);
+        hostPriorityPerRack.put(rack, hostsOfRack);
+      }
+    } else {
+      if (totalHostPriority.containsKey(hostAndDisk)) {
+        PrioritizedHost prioritizedHost = totalHostPriority.remove(hostAndDisk);
+
+        String rack = topologyCache.resolve(hostAndDisk.host);
+        if (hostPriorityPerRack.containsKey(rack)) {
+          Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
+          hostsOfRack.remove(prioritizedHost);
+          if (hostsOfRack.size() == 0){
+            hostPriorityPerRack.remove(rack);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void addFragment(FragmentPair fragmentPair) {
+    String[] hosts = fragmentPair.getLeftFragment().getHosts();
+    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      addFragment(hosts[i], diskIds[i], fragmentPair);
+    }
+    totalFragmentNum++;
+  }
+
+  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
+    host = topologyCache.normalize(host);
+    FragmentsPerDisk fragmentsPerDisk = getHostFragmentSet(host, diskId);
+    fragmentsPerDisk.addFragmentPair(fragmentPair);
+
+    int priority;
+    HostAndDisk hostAndDisk = new HostAndDisk(host, diskId);
+    if (totalHostPriority.containsKey(hostAndDisk)) {
+      priority = totalHostPriority.get(hostAndDisk).priority;
+    } else {
+      priority = 0;
+    }
+    updateHostPriority(hostAndDisk, priority+1);
+  }
+
+  public int size() {
+    return totalFragmentNum;
+  }
+
+  /**
+   * Selects a fragment that is stored in the given host, and replicated at the disk of the maximum
+   * priority.
+   * @param host
+   * @return If there are fragments stored in the host, returns a fragment. Otherwise, return null.
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host) {
+    String normalizedHost = topologyCache.normalize(host);
+    if (!fragmentHostMapping.containsKey(normalizedHost)) {
+      return null;
+    }
+
+    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+    List<Integer> disks = Lists.newArrayList(fragmentsPerDiskMap.keySet());
+    Collections.shuffle(disks);
+    FragmentsPerDisk fragmentsPerDisk = null;
+    FragmentPair fragmentPair = null;
+
+    for (Integer diskId : disks) {
+      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+      if (fragmentsPerDisk != null && !fragmentsPerDisk.isEmpty()) {
+        fragmentPair = getBestFragment(fragmentsPerDisk);
+      }
+      if (fragmentPair != null) {
+        return fragmentPair;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Selects a fragment that is stored at the given disk of the given host, and replicated at the disk of the maximum
+   * priority.
+   * @param host
+   * @param diskId
+   * @return If there are fragments stored at the disk of the host, returns a fragment. Otherwise, return null.
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
+    String normalizedHost = NetUtils.normalizeHost(host);
+    if (fragmentHostMapping.containsKey(normalizedHost)) {
+      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+      if (fragmentsPerDiskMap.containsKey(diskId)) {
+        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+        if (!fragmentsPerDisk.isEmpty()) {
+          return getBestFragment(fragmentsPerDisk);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * In the descending order of priority, find a fragment that is shared by the given fragment set and the fragment set
+   * of the maximal priority.
+   * @param fragmentsPerDisk a fragment set
+   * @return a fragment that is shared by the given fragment set and the fragment set of the maximal priority
+   */
+  private FragmentPair getBestFragment(FragmentsPerDisk fragmentsPerDisk) {
+    // Select a fragment that is shared by host and another hostAndDisk that has the most fragments
+    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
+    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
+    Arrays.sort(sortedHosts, hostComparator);
+
+    for (PrioritizedHost nextHost : sortedHosts) {
+      if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
+        Map<Integer, FragmentsPerDisk> diskFragmentsMap = fragmentHostMapping.get(nextHost.hostAndDisk.host);
+        if (diskFragmentsMap.containsKey(nextHost.hostAndDisk.diskId)) {
+          Set<FragmentPair> largeFragmentPairSet = diskFragmentsMap.get(nextHost.hostAndDisk.diskId).getFragmentPairSet();
+          Iterator<FragmentPair> smallFragmentSetIterator = fragmentsPerDisk.getFragmentPairIterator();
+          while (smallFragmentSetIterator.hasNext()) {
+            FragmentPair eachFragmentOfSmallSet = smallFragmentSetIterator.next();
+            if (largeFragmentPairSet.contains(eachFragmentOfSmallSet)) {
+              return eachFragmentOfSmallSet;
+            }
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Selects a fragment that is stored at the same rack of the given host, and replicated at the disk of the maximum
+   * priority.
+   * @param host
+   * @return If there are fragments stored at the same rack of the given host, returns a fragment. Otherwise, return null.
+   */
+  public FragmentPair getRackLocalFragment(String host) {
+    host = topologyCache.normalize(host);
+    // Select a fragment from a host that has the most fragments in the rack
+    String rack = topologyCache.resolve(host);
+    Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
+    if (hostsOfRack != null && hostsOfRack.size() > 0) {
+      PrioritizedHost[] sortedHosts = hostsOfRack.toArray(new PrioritizedHost[hostsOfRack.size()]);
+      Arrays.sort(sortedHosts, hostComparator);
+      for (PrioritizedHost nextHost : sortedHosts) {
+        if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
+          List<FragmentsPerDisk> disks = Lists.newArrayList(fragmentHostMapping.get(nextHost.hostAndDisk.host).values());
+          Collections.shuffle(disks);
+
+          for (FragmentsPerDisk fragmentsPerDisk : disks) {
+            if (!fragmentsPerDisk.isEmpty()) {
+              return fragmentsPerDisk.getFragmentPairIterator().next();
+            }
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Selects a fragment from the disk of the maximum priority.
+   * @return If there are remaining fragments, it returns a fragment. Otherwise, it returns null.
+   */
+  public FragmentPair getRandomFragment() {
+    // Select a fragment from a host that has the most fragments
+    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
+    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
+    Arrays.sort(sortedHosts, hostComparator);
+    PrioritizedHost randomHost = sortedHosts[0];
+    if (fragmentHostMapping.containsKey(randomHost.hostAndDisk.host)) {
+      Iterator<FragmentsPerDisk> fragmentsPerDiskIterator = fragmentHostMapping.get(randomHost.hostAndDisk.host).values().iterator();
+      if (fragmentsPerDiskIterator.hasNext()) {
+        Iterator<FragmentPair> fragmentPairIterator = fragmentsPerDiskIterator.next().getFragmentPairIterator();
+        if (fragmentPairIterator.hasNext()) {
+          return fragmentPairIterator.next();
+        }
+      }
+    }
+    return null;
+  }
+
+  public FragmentPair[] getAllFragments() {
+    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+    for (Map<Integer, FragmentsPerDisk> eachValue : fragmentHostMapping.values()) {
+      for (FragmentsPerDisk fragmentsPerDisk : eachValue.values()) {
+        Set<FragmentPair> pairSet = fragmentsPerDisk.getFragmentPairSet();
+        fragmentPairs.addAll(pairSet);
+      }
+    }
+    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
+  }
+
+  public void removeFragment(FragmentPair fragmentPair) {
+    String [] hosts = fragmentPair.getLeftFragment().getHosts();
+    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      String normalizedHost = NetUtils.normalizeHost(hosts[i]);
+      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
+
+      if (diskFragmentMap != null) {
+        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]);
+        if (fragmentsPerDisk != null) {
+          boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
+          if (isRemoved) {
+            if (fragmentsPerDisk.size() == 0) {
+              diskFragmentMap.remove(diskIds[i]);
+              if (diskFragmentMap.size() == 0) {
+                fragmentHostMapping.remove(normalizedHost);
+              }
+            }
+            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]);
+            if (totalHostPriority.containsKey(hostAndDisk)) {
+              PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
+              updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);
+            }
+          }
+        }
+      }
+    }
+
+    totalFragmentNum--;
+  }
+
+  private static class HostAndDisk {
+    private String host;
+    private Integer diskId;
+
+    public HostAndDisk(String host, Integer diskId) {
+      this.host = host;
+      this.diskId = diskId;
+    }
+
+    public String getHost() {
+      return host;
+    }
+
+    public int getDiskId() {
+      return diskId;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(host, diskId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof HostAndDisk) {
+        HostAndDisk other = (HostAndDisk) o;
+        return this.host.equals(other.host) &&
+            TUtil.checkEquals(this.diskId, other.diskId);
+      }
+      return false;
+    }
+  }
+
+  public static class PrioritizedHost {
+    private HostAndDisk hostAndDisk;
+    private int priority;
+
+    public PrioritizedHost(HostAndDisk hostAndDisk, int priority) {
+      this.hostAndDisk = hostAndDisk;
+      this.priority = priority;
+    }
+
+    public PrioritizedHost(String host, Integer diskId, int priority) {
+      this.hostAndDisk = new HostAndDisk(host, diskId);
+      this.priority = priority;
+    }
+
+    public String getHost() {
+      return hostAndDisk.host;
+    }
+
+    public Integer getDiskId() {
+      return hostAndDisk.diskId;
+    }
+
+    public Integer getPriority() {
+      return priority;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof PrioritizedHost) {
+        PrioritizedHost other = (PrioritizedHost) o;
+        return this.hostAndDisk.equals(other.hostAndDisk);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return hostAndDisk.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "host: " + hostAndDisk.host + " disk: " + hostAndDisk.diskId + " priority: " + priority;
+    }
+  }
+
+
+  public static class HostPriorityComparator implements Comparator<PrioritizedHost> {
+
+    @Override
+    public int compare(PrioritizedHost prioritizedHost, PrioritizedHost prioritizedHost2) {
+      return prioritizedHost2.priority - prioritizedHost.priority;
+    }
+  }
+
+
+  public static class TopologyCache {
+    private Map<String, String> hostRackMap = new HashMap<String, String>();
+    private Map<String, String> normalizedHostMap = new HashMap<String, String>();
+
+    public String normalize(String host) {
+      if (normalizedHostMap.containsKey(host)) {
+        return normalizedHostMap.get(host);
+      } else {
+        String normalized = NetUtils.normalizeHost(host);
+        normalizedHostMap.put(host, normalized);
+        return normalized;
+      }
+    }
+
+    public String resolve(String host) {
+      if (hostRackMap.containsKey(host)) {
+        return hostRackMap.get(host);
+      } else {
+        String rack = RackResolver.resolve(host).getNetworkLocation();
+        hostRackMap.put(host, rack);
+        return rack;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
new file mode 100644
index 0000000..434ea22
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class LazyTaskScheduler extends AbstractTaskScheduler {
+  private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class);
+
+  private final TaskSchedulerContext context;
+  private final SubQuery subQuery;
+
+  private Thread schedulingThread;
+  private volatile boolean stopEventHandling;
+
+  BlockingQueue<TaskSchedulerEvent> eventQueue
+      = new LinkedBlockingQueue<TaskSchedulerEvent>();
+
+  private TaskRequests taskRequests;
+  private FragmentScheduleAlgorithm scheduledFragments;
+  private ScheduledFetches scheduledFetches;
+
+  private int diskLocalAssigned = 0;
+  private int hostLocalAssigned = 0;
+  private int rackLocalAssigned = 0;
+  private int totalAssigned = 0;
+
+  private int nextTaskId = 0;
+  private int containerNum;
+
+  public LazyTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
+    super(LazyTaskScheduler.class.getName());
+    this.context = context;
+    this.subQuery = subQuery;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    taskRequests  = new TaskRequests();
+    try {
+      scheduledFragments = FragmentScheduleAlgorithmFactory.get(conf);
+      LOG.info(scheduledFragments.getClass().getSimpleName() + " is selected for the scheduling algorithm.");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    if (!context.isLeafQuery()) {
+      scheduledFetches = new ScheduledFetches();
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    containerNum = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
+        subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+        context.getEstimatedTaskNum(), 512);
+
+    LOG.info("Start TaskScheduler");
+    this.schedulingThread = new Thread() {
+      public void run() {
+
+        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            break;
+          }
+
+          schedule();
+        }
+        LOG.info("TaskScheduler schedulingThread stopped");
+      }
+    };
+
+    this.schedulingThread.start();
+    super.start();
+  }
+
+  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+  static {
+    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+
+    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+    builder.setId(NULL_ATTEMPT_ID.getProto());
+    builder.setShouldDie(true);
+    builder.setOutputTable("");
+    builder.setSerializedData("");
+    builder.setClusteredOutput(false);
+    stopTaskRunnerReq = builder.build();
+  }
+
+  @Override
+  public void stop() {
+    stopEventHandling = true;
+    schedulingThread.interrupt();
+
+    // Return all of request callbacks instantly.
+    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+      req.getCallback().run(stopTaskRunnerReq);
+    }
+
+    LOG.info("Task Scheduler stopped");
+    super.stop();
+  }
+
+  List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
+  public void schedule() {
+    if (taskRequests.size() > 0) {
+      if (context.isLeafQuery()) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", Fragment Schedule Request: " +
+            scheduledFragments.size());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledFragments.size());
+        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+        if (taskRequestEvents.size() > 0) {
+          assignLeafTasks(taskRequestEvents);
+        }
+        taskRequestEvents.clear();
+      } else {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", Fetch Schedule Request: " +
+            scheduledFetches.size());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledFetches.size());
+        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+        if (taskRequestEvents.size() > 0) {
+          assignNonLeafTasks(taskRequestEvents);
+        }
+        taskRequestEvents.clear();
+      }
+    }
+  }
+
+  @Override
+  public void handle(TaskSchedulerEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of YarnRMContainerAllocator: " + remCapacity);
+    }
+
+    if (event.getType() == EventType.T_SCHEDULE) {
+      if (event instanceof FragmentScheduleEvent) {
+        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+        Collection<FileFragment> rightFragments = castEvent.getRightFragments();
+        if (rightFragments == null || rightFragments.isEmpty()) {
+          scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null));
+        } else {
+          for (FileFragment eachFragment: rightFragments) {
+            scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment));
+          }
+        }
+        initDiskBalancer(castEvent.getLeftFragment().getHosts(), castEvent.getLeftFragment().getDiskIds());
+      } else if (event instanceof FetchScheduleEvent) {
+        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+        scheduledFetches.addFetch(castEvent.getFetches());
+      } else if (event instanceof QueryUnitAttemptScheduleEvent) {
+        QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+        assignTask(castEvent.getContext(), castEvent.getQueryUnitAttempt());
+      }
+    }
+  }
+
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    taskRequests.handle(event);
+  }
+
+  @Override
+  public int remainingScheduledObjectNum() {
+    if (context.isLeafQuery()) {
+      return scheduledFragments.size();
+    } else {
+      return scheduledFetches.size();
+    }
+  }
+
+  private Map<String, DiskBalancer> hostDiskBalancerMap = new HashMap<String, DiskBalancer>();
+
+  private void initDiskBalancer(String[] hosts, int[] diskIds) {
+    for (int i = 0; i < hosts.length; i++) {
+      DiskBalancer diskBalancer;
+      String normalized = NetUtils.normalizeHost(hosts[i]);
+      if (hostDiskBalancerMap.containsKey(normalized)) {
+        diskBalancer = hostDiskBalancerMap.get(normalized);
+      } else {
+        diskBalancer = new DiskBalancer(normalized);
+        hostDiskBalancerMap.put(normalized, diskBalancer);
+      }
+      diskBalancer.addDiskId(diskIds[i]);
+    }
+  }
+
+  private static class DiskBalancer {
+    private HashMap<ContainerId, Integer> containerDiskMap = new HashMap<ContainerId, Integer>();
+    private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>();
+    private String host;
+
+    public DiskBalancer(String host){
+      this.host = host;
+    }
+
+    public void addDiskId(Integer diskId) {
+      if (!diskReferMap.containsKey(diskId)) {
+        diskReferMap.put(diskId, 0);
+      }
+    }
+
+    public Integer getDiskId(ContainerId containerId) {
+      if (!containerDiskMap.containsKey(containerId)) {
+        assignVolumeId(containerId);
+      }
+
+      return containerDiskMap.get(containerId);
+    }
+
+    public void assignVolumeId(ContainerId containerId){
+      Map.Entry<Integer, Integer> volumeEntry = null;
+
+      for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) {
+        if(volumeEntry == null) volumeEntry = entry;
+
+        if (volumeEntry.getValue() >= entry.getValue()) {
+          volumeEntry = entry;
+        }
+      }
+
+      if(volumeEntry != null){
+        diskReferMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
+        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
+            + diskReferMap.get(volumeEntry.getKey()));
+        containerDiskMap.put(containerId, volumeEntry.getKey());
+      }
+    }
+
+    public String getHost() {
+      return host;
+    }
+  }
+
+  private class TaskRequests implements EventHandler<TaskRequestEvent> {
+    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+        new LinkedBlockingQueue<TaskRequestEvent>();
+
+    @Override
+    public void handle(TaskRequestEvent event) {
+      LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+      if(stopEventHandling) {
+        event.getCallback().run(stopTaskRunnerReq);
+        return;
+      }
+      int qSize = taskRequestQueue.size();
+      if (qSize != 0 && qSize % 1000 == 0) {
+        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+      }
+      int remCapacity = taskRequestQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue "
+            + "of YarnRMContainerAllocator: " + remCapacity);
+      }
+
+      taskRequestQueue.add(event);
+    }
+
+    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+                                int num) {
+      taskRequestQueue.drainTo(taskRequests, num);
+    }
+
+    public int size() {
+      return taskRequestQueue.size();
+    }
+  }
+
+  private long adjustTaskSize() {
+    long originTaskSize = context.getMasterContext().getConf().getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024;
+    long fragNumPerTask = context.getTaskSize() / originTaskSize;
+    if (fragNumPerTask * containerNum > remainingScheduledObjectNum()) {
+      return context.getTaskSize();
+    } else {
+      fragNumPerTask = (long) Math.ceil((double)remainingScheduledObjectNum() / (double)containerNum);
+      return originTaskSize * fragNumPerTask;
+    }
+  }
+
+  private void assignLeafTasks(List<TaskRequestEvent> taskRequests) {
+    Collections.shuffle(taskRequests);
+    Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+    TaskRequestEvent taskRequest;
+    while (it.hasNext() && scheduledFragments.size() > 0) {
+      taskRequest = it.next();
+      LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+          "containerId=" + taskRequest.getContainerId());
+      ContainerProxy container = context.getMasterContext().getResourceAllocator().
+          getContainer(taskRequest.getContainerId());
+
+      if(container == null) {
+        continue;
+      }
+
+      String host = container.getTaskHostName();
+      QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+          host, taskRequest.getCallback());
+      QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+
+      FragmentPair fragmentPair;
+      List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+      boolean diskLocal = false;
+      long assignedFragmentSize = 0;
+      long taskSize = adjustTaskSize();
+      LOG.info("Adjusted task size: " + taskSize);
+
+      // host local, disk local
+      String normalized = NetUtils.normalizeHost(host);
+      Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
+      if (diskId != null && diskId != -1) {
+        do {
+          fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId);
+          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
+            break;
+          }
+
+          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+            break;
+          } else {
+            fragmentPairs.add(fragmentPair);
+            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+            if (fragmentPair.getRightFragment() != null) {
+              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+            }
+          }
+          scheduledFragments.removeFragment(fragmentPair);
+          diskLocal = true;
+        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
+      }
+
+      if (assignedFragmentSize < taskSize) {
+        // host local
+        do {
+          fragmentPair = scheduledFragments.getHostLocalFragment(host);
+          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
+            break;
+          }
+
+          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+            break;
+          } else {
+            fragmentPairs.add(fragmentPair);
+            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+            if (fragmentPair.getRightFragment() != null) {
+              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+            }
+          }
+          scheduledFragments.removeFragment(fragmentPair);
+          diskLocal = false;
+        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
+      }
+
+      // rack local
+      if (fragmentPairs.size() == 0) {
+        fragmentPair = scheduledFragments.getRackLocalFragment(host);
+
+        // random
+        if (fragmentPair == null) {
+          fragmentPair = scheduledFragments.getRandomFragment();
+        } else {
+          rackLocalAssigned++;
+        }
+
+        if (fragmentPair != null) {
+          fragmentPairs.add(fragmentPair);
+          scheduledFragments.removeFragment(fragmentPair);
+        }
+      } else {
+        if (diskLocal) {
+          diskLocalAssigned++;
+        } else {
+          hostLocalAssigned++;
+        }
+      }
+
+      if (fragmentPairs.size() == 0) {
+        throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+      }
+
+      LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());
+
+      task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
+      subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+    }
+  }
+
+  private void assignNonLeafTasks(List<TaskRequestEvent> taskRequests) {
+    Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+    TaskRequestEvent taskRequest;
+    while (it.hasNext()) {
+      taskRequest = it.next();
+      LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+      // random allocation
+      if (scheduledFetches.size() > 0) {
+        LOG.debug("Assigned based on * match");
+        ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
+            taskRequest.getContainerId());
+        QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+            container.getTaskHostName(), taskRequest.getCallback());
+        QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+        task.setFragment(scheduledFragments.getAllFragments());
+        subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+      }
+    }
+  }
+
+  private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) {
+    QueryUnitAttemptId attemptId = taskAttempt.getId();
+    ContainerProxy containerProxy = context.getMasterContext().getResourceAllocator().
+        getContainer(attemptContext.getContainerId());
+    QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+        attemptId,
+        new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()),
+        "",
+        false,
+        taskAttempt.getQueryUnit().getLogicalPlan().toJson(),
+        context.getMasterContext().getQueryContext(),
+        subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
+    if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+      taskAssign.setInterQuery();
+    }
+
+    if (!context.isLeafQuery()) {
+      Map<String, List<URI>> fetch = scheduledFetches.getNextFetch();
+      scheduledFetches.popNextFetch();
+
+      for (Entry<String, List<URI>> fetchEntry : fetch.entrySet()) {
+        for (URI eachValue : fetchEntry.getValue()) {
+          taskAssign.addFetch(fetchEntry.getKey(), eachValue);
+        }
+      }
+    }
+
+    context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+        attemptContext.getContainerId(), attemptContext.getHost(), containerProxy.getTaskPort()));
+
+    totalAssigned++;
+    attemptContext.getCallback().run(taskAssign.getProto());
+
+    if (context.isLeafQuery()) {
+      LOG.debug("DiskLocalAssigned / Total: " + diskLocalAssigned + " / " + totalAssigned);
+      LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
+      LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
+    }
+  }
+
+  private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+    if (masterPlan.isRoot(block)) {
+      return false;
+    }
+
+    ExecutionBlock parent = masterPlan.getParent(block);
+    if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+      return false;
+    }
+
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
new file mode 100644
index 0000000..9b7dc22
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ScheduledFetches {
+  private List<Map<String, List<URI>>> fetches = new ArrayList<Map<String, List<URI>>>();
+
+  public void addFetch(Map<String, List<URI>> fetch) {
+    this.fetches.add(fetch);
+  }
+
+  public boolean hasNextFetch() {
+    return fetches.size() > 0;
+  }
+
+  public Map<String, List<URI>> getNextFetch() {
+    return hasNextFetch() ? fetches.get(0) : null;
+  }
+
+  public Map<String, List<URI>> popNextFetch() {
+    return hasNextFetch() ? fetches.remove(0) : null;
+  }
+
+  public int size() {
+    return fetches.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
new file mode 100644
index 0000000..751b21b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TajoAsyncDispatcher extends AbstractService  implements Dispatcher {
+
+  private static final Log LOG = LogFactory.getLog(TajoAsyncDispatcher.class);
+
+  private final BlockingQueue<Event> eventQueue;
+  private volatile boolean stopped = false;
+
+  private Thread eventHandlingThread;
+  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
+  private boolean exitOnDispatchException;
+
+  private String id;
+
+  public TajoAsyncDispatcher(String id) {
+    this(id, new LinkedBlockingQueue<Event>());
+  }
+
+  public TajoAsyncDispatcher(String id, BlockingQueue<Event> eventQueue) {
+    super(TajoAsyncDispatcher.class.getName());
+    this.id = id;
+    this.eventQueue = eventQueue;
+    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
+  }
+
+  Runnable createThread() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+          Event event;
+          try {
+            event = eventQueue.take();
+            if(LOG.isDebugEnabled()) {
+              LOG.debug(id + ",event take:" + event.getType() + "," + event);
+            }
+          } catch(InterruptedException ie) {
+            if (!stopped) {
+              LOG.warn("AsyncDispatcher thread interrupted");
+            }
+            return;
+          }
+          dispatch(event);
+        }
+      }
+    };
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    this.exitOnDispatchException =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    //start all the components
+    super.start();
+    eventHandlingThread = new Thread(createThread());
+    eventHandlingThread.setName("AsyncDispatcher event handler");
+    eventHandlingThread.start();
+
+    LOG.info("AsyncDispatcher started:" + id);
+  }
+
+  @Override
+  public synchronized void stop() {
+    if(stopped) {
+      return;
+    }
+    stopped = true;
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+      try {
+        eventHandlingThread.join();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted Exception while stopping");
+      }
+    }
+
+    // stop all the components
+    super.stop();
+
+    LOG.info("AsyncDispatcher stopped:" + id);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void dispatch(Event event) {
+    //all events go thru this loop
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+          + event.toString());
+    }
+    Class<? extends Enum> type = event.getType().getDeclaringClass();
+
+    try{
+      EventHandler handler = eventDispatchers.get(type);
+      if(handler != null) {
+        handler.handle(event);
+      } else {
+        throw new Exception("No handler for registered for " + type);
+      }
+    } catch (Throwable t) {
+      //TODO Maybe log the state of the queue
+      LOG.fatal("Error in dispatcher thread:" + event.getType(), t);
+      if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
+        LOG.info("Exiting, bye..");
+        System.exit(-1);
+      }
+    } finally {
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void register(Class<? extends Enum> eventType,
+                       EventHandler handler) {
+    /* check to see if we have a listener registered */
+    EventHandler<Event> registeredHandler = (EventHandler<Event>)
+        eventDispatchers.get(eventType);
+    LOG.debug("Registering " + eventType + " for " + handler.getClass());
+    if (registeredHandler == null) {
+      eventDispatchers.put(eventType, handler);
+    } else if (!(registeredHandler instanceof MultiListenerHandler)){
+      /* for multiple listeners of an event add the multiple listener handler */
+      MultiListenerHandler multiHandler = new MultiListenerHandler();
+      multiHandler.addHandler(registeredHandler);
+      multiHandler.addHandler(handler);
+      eventDispatchers.put(eventType, multiHandler);
+    } else {
+      /* already a multilistener, just add to it */
+      MultiListenerHandler multiHandler
+          = (MultiListenerHandler) registeredHandler;
+      multiHandler.addHandler(handler);
+    }
+  }
+
+  @Override
+  public EventHandler getEventHandler() {
+    return new GenericEventHandler();
+  }
+
+  class GenericEventHandler implements EventHandler<Event> {
+    public void handle(Event event) {
+      /* all this method does is enqueue all the events onto the queue */
+      int qSize = eventQueue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of event-queue is " + qSize);
+      }
+      int remCapacity = eventQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue: "
+            + remCapacity);
+      }
+      try {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(id + ",add event:" +
+              event.getType() + "," + event + "," +
+              (eventHandlingThread == null ? "null" : eventHandlingThread.isAlive()));
+        }
+        eventQueue.put(event);
+      } catch (InterruptedException e) {
+        if (!stopped) {
+          LOG.warn("AsyncDispatcher thread interrupted", e);
+        }
+        throw new YarnRuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Multiplexing an event. Sending it to different handlers that
+   * are interested in the event.
+   */
+  static class MultiListenerHandler implements EventHandler<Event> {
+    List<EventHandler<Event>> listofHandlers;
+
+    public MultiListenerHandler() {
+      listofHandlers = new ArrayList<EventHandler<Event>>();
+    }
+
+    @Override
+    public void handle(Event event) {
+      for (EventHandler<Event> handler: listofHandlers) {
+        handler.handle(event);
+      }
+    }
+
+    void addHandler(EventHandler<Event> handler) {
+      listofHandlers.add(handler);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
new file mode 100644
index 0000000..7f1eac6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TajoContainerProxy extends ContainerProxy {
+  public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
+                            Configuration conf, Container container,
+                            ExecutionBlockId executionBlockId) {
+    super(context, conf, executionBlockId, container);
+  }
+
+  @Override
+  public void launch(ContainerLaunchContext containerLaunchContext) {
+    context.getResourceAllocator().addContainer(containerID, this);
+
+    this.hostName = container.getNodeId().getHost();
+    this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
+    this.state = ContainerState.RUNNING;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Launch Container:" + executionBlockId + "," + containerID.getId() + "," +
+          container.getId() + "," + container.getNodeId() + ", pullServer=" + port);
+    }
+
+    assignExecutionBlock(executionBlockId, container);
+  }
+
+  /**
+   * It sends a kill RPC request to a corresponding worker.
+   *
+   * @param taskAttemptId The TaskAttemptId to be killed.
+   */
+  public void killTaskAttempt(QueryUnitAttemptId taskAttemptId) {
+    NettyClientBase tajoWorkerRpc = null;
+    try {
+      InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
+      tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
+      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+      tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+    }
+  }
+
+  private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
+    NettyClientBase tajoWorkerRpc = null;
+    try {
+      InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
+          .getQueryMasterManagerService().getBindAddr();
+
+      InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
+      tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
+      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+
+      TajoWorkerProtocol.RunExecutionBlockRequestProto request =
+          TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
+              .setExecutionBlockId(executionBlockId.toString())
+              .setQueryMasterHost(myAddr.getHostName())
+              .setQueryMasterPort(myAddr.getPort())
+              .setNodeId(container.getNodeId().toString())
+              .setContainerId(container.getId().toString())
+              .setQueryOutputPath(context.getStagingDir().toString())
+              .build();
+
+      tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+    }
+  }
+
+  @Override
+  public synchronized void stopContainer() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state);
+    }
+    if(isCompletelyDone()) {
+      LOG.info("Container already stopped:" + containerID);
+      return;
+    }
+    if(this.state == ContainerState.PREP) {
+      this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+    } else {
+      try {
+        TajoWorkerContainer tajoWorkerContainer = ((TajoWorkerContainer)container);
+        releaseWorkerResource(context, executionBlockId, tajoWorkerContainer.getId());
+        context.getResourceAllocator().removeContainer(containerID);
+        this.state = ContainerState.DONE;
+      } catch (Throwable t) {
+        // ignore the cleanup failure
+        String message = "cleanup failed for container "
+            + this.containerID + " : "
+            + StringUtils.stringifyException(t);
+        LOG.warn(message);
+        this.state = ContainerState.DONE;
+        return;
+      }
+    }
+  }
+
+  public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
+                                           ExecutionBlockId executionBlockId,
+                                           ContainerId containerId) throws Exception {
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(containerId);
+
+    releaseWorkerResource(context, executionBlockId, containerIds);
+  }
+
+  public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
+                                           ExecutionBlockId executionBlockId,
+                                           List<ContainerId> containerIds) throws Exception {
+    List<YarnProtos.ContainerIdProto> containerIdProtos =
+        new ArrayList<YarnProtos.ContainerIdProto>();
+
+    for(ContainerId eachContainerId: containerIds) {
+      containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
+    }
+
+    RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
+    NettyClientBase tmClient = null;
+    try {
+        tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
+        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        masterClientService.releaseWorkerResource(null,
+          TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+              .setExecutionBlockId(executionBlockId.getProto())
+              .addAllContainerIds(containerIdProtos)
+              .build(),
+          NullCallback.get());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
new file mode 100644
index 0000000..9d54bb5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -0,0 +1,579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.function.Function;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamOptionTypes;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.session.SessionManager;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.util.ClassUtil;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+import org.apache.tajo.webapp.QueryExecutorServlet;
+import org.apache.tajo.webapp.StaticHttpServer;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
+public class TajoMaster extends CompositeService {
+  private static final String METRICS_GROUP_NAME = "tajomaster";
+
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(TajoMaster.class);
+
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  /** rw-r--r-- */
+  @SuppressWarnings("OctalInteger")
+  final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
+  /** rw-r--r-- */
+  @SuppressWarnings("OctalInteger")
+  final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
+  /** rw-r--r-- */
+  final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
+  /** rw-r--r-- */
+  @SuppressWarnings("OctalInteger")
+  final public static FsPermission WAREHOUSE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
+  /** rw-r--r-- */
+  @SuppressWarnings("OctalInteger")
+  final public static FsPermission STAGING_ROOTDIR_PERMISSION = FsPermission.createImmutable((short) 0755);
+  /** rw-r--r-- */
+  @SuppressWarnings("OctalInteger")
+  final public static FsPermission SYSTEM_CONF_FILE_PERMISSION = FsPermission.createImmutable((short) 0755);
+
+
+  private MasterContext context;
+  private TajoConf systemConf;
+  private FileSystem defaultFS;
+  private Clock clock;
+
+  private Path tajoRootPath;
+  private Path wareHousePath;
+
+  private CatalogServer catalogServer;
+  private CatalogService catalog;
+  private AbstractStorageManager storeManager;
+  private GlobalEngine globalEngine;
+  private AsyncDispatcher dispatcher;
+  private TajoMasterClientService tajoMasterClientService;
+  private TajoMasterService tajoMasterService;
+  private SessionManager sessionManager;
+
+  private WorkerResourceManager resourceManager;
+  //Web Server
+  private StaticHttpServer webServer;
+
+  private QueryJobManager queryJobManager;
+
+  private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+  private TajoSystemMetrics systemMetrics;
+
+  public TajoMaster() throws Exception {
+    super(TajoMaster.class.getName());
+  }
+
+  public String getMasterName() {
+    return NetUtils.normalizeInetSocketAddress(tajoMasterService.getBindAddress());
+  }
+
+  public String getVersion() {
+    return TajoConstants.TAJO_VERSION;
+  }
+
+  public TajoMasterClientService getTajoMasterClientService() {
+    return  tajoMasterClientService;
+  }
+
+  @Override
+  public void serviceInit(Configuration _conf) throws Exception {
+    this.systemConf = (TajoConf) _conf;
+
+    context = new MasterContext(systemConf);
+    clock = new SystemClock();
+
+    try {
+      RackResolver.init(systemConf);
+
+      initResourceManager();
+      initWebServer();
+
+      this.dispatcher = new AsyncDispatcher();
+      addIfService(dispatcher);
+
+      // check the system directory and create if they are not created.
+      checkAndInitializeSystemDirectories();
+      this.storeManager = StorageManagerFactory.getStorageManager(systemConf);
+
+      catalogServer = new CatalogServer(initBuiltinFunctions());
+      addIfService(catalogServer);
+      catalog = new LocalCatalogWrapper(catalogServer, systemConf);
+
+      sessionManager = new SessionManager(dispatcher);
+      addIfService(sessionManager);
+
+      globalEngine = new GlobalEngine(context);
+      addIfService(globalEngine);
+
+      queryJobManager = new QueryJobManager(context);
+      addIfService(queryJobManager);
+
+      tajoMasterClientService = new TajoMasterClientService(context);
+      addIfService(tajoMasterClientService);
+
+      tajoMasterService = new TajoMasterService(context);
+      addIfService(tajoMasterService);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw e;
+    }
+
+    super.serviceInit(systemConf);
+    LOG.info("Tajo Master is initialized.");
+  }
+
+  private void initSystemMetrics() {
+    systemMetrics = new TajoSystemMetrics(systemConf, METRICS_GROUP_NAME, getMasterName());
+    systemMetrics.start();
+
+    systemMetrics.register("resource", new WorkerResourceMetricsGaugeSet(context));
+    systemMetrics.register("catalog", new CatalogMetricsGaugeSet(context));
+  }
+
+  private void initResourceManager() throws Exception {
+    Class<WorkerResourceManager>  resourceManagerClass = (Class<WorkerResourceManager>)
+        systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class);
+    Constructor<WorkerResourceManager> constructor = resourceManagerClass.getConstructor(MasterContext.class);
+    resourceManager = constructor.newInstance(context);
+    addIfService(resourceManager);
+  }
+
+  private void initWebServer() throws Exception {
+    if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+      InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+      webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(),
+          true, null, context.getConf(), null);
+      webServer.addServlet("queryServlet", "/query_exec", QueryExecutorServlet.class);
+      webServer.start();
+    }
+  }
+
+  private void checkAndInitializeSystemDirectories() throws IOException {
+    // Get Tajo root dir
+    this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
+    LOG.info("Tajo Root Directory: " + tajoRootPath);
+
+    // Check and Create Tajo root dir
+    this.defaultFS = tajoRootPath.getFileSystem(systemConf);
+    systemConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
+    LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized.");
+    if (!defaultFS.exists(tajoRootPath)) {
+      defaultFS.mkdirs(tajoRootPath, new FsPermission(TAJO_ROOT_DIR_PERMISSION));
+      LOG.info("Tajo Root Directory '" + tajoRootPath + "' is created.");
+    }
+
+    // Check and Create system and system resource dir
+    Path systemPath = TajoConf.getSystemDir(systemConf);
+    if (!defaultFS.exists(systemPath)) {
+      defaultFS.mkdirs(systemPath, new FsPermission(SYSTEM_DIR_PERMISSION));
+      LOG.info("System dir '" + systemPath + "' is created");
+    }
+    Path systemResourcePath = TajoConf.getSystemResourceDir(systemConf);
+    if (!defaultFS.exists(systemResourcePath)) {
+      defaultFS.mkdirs(systemResourcePath, new FsPermission(SYSTEM_RESOURCE_DIR_PERMISSION));
+      LOG.info("System resource dir '" + systemResourcePath + "' is created");
+    }
+
+    // Get Warehouse dir
+    this.wareHousePath = TajoConf.getWarehouseDir(systemConf);
+    LOG.info("Tajo Warehouse dir: " + wareHousePath);
+
+    // Check and Create Warehouse dir
+    if (!defaultFS.exists(wareHousePath)) {
+      defaultFS.mkdirs(wareHousePath, new FsPermission(WAREHOUSE_DIR_PERMISSION));
+      LOG.info("Warehouse dir '" + wareHousePath + "' is created");
+    }
+
+    Path stagingPath = TajoConf.getStagingDir(systemConf);
+    LOG.info("Staging dir: " + wareHousePath);
+    if (!defaultFS.exists(stagingPath)) {
+      defaultFS.mkdirs(stagingPath, new FsPermission(STAGING_ROOTDIR_PERMISSION));
+      LOG.info("Staging dir '" + stagingPath + "' is created");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static List<FunctionDesc> initBuiltinFunctions() throws ServiceException {
+    List<FunctionDesc> sqlFuncs = new ArrayList<FunctionDesc>();
+
+    Set<Class> functionClasses = ClassUtil.findClasses(org.apache.tajo.catalog.function.Function.class,
+          "org.apache.tajo.engine.function");
+
+    for (Class eachClass : functionClasses) {
+      if(eachClass.isInterface() || Modifier.isAbstract(eachClass.getModifiers())) {
+        continue;
+      }
+      Function function = null;
+      try {
+        function = (Function)eachClass.newInstance();
+      } catch (Exception e) {
+        LOG.warn(eachClass + " cannot instantiate Function class because of " + e.getMessage());
+        continue;
+      }
+      String functionName = function.getClass().getAnnotation(Description.class).functionName();
+      String[] synonyms = function.getClass().getAnnotation(Description.class).synonyms();
+      String description = function.getClass().getAnnotation(Description.class).description();
+      String detail = function.getClass().getAnnotation(Description.class).detail();
+      String example = function.getClass().getAnnotation(Description.class).example();
+      Type returnType = function.getClass().getAnnotation(Description.class).returnType();
+      ParamTypes[] paramArray = function.getClass().getAnnotation(Description.class).paramTypes();
+
+      String[] allFunctionNames = null;
+      if(synonyms != null && synonyms.length > 0) {
+        allFunctionNames = new String[1 + synonyms.length];
+        allFunctionNames[0] = functionName;
+        System.arraycopy(synonyms, 0, allFunctionNames, 1, synonyms.length);
+      } else {
+        allFunctionNames = new String[]{functionName};
+      }
+
+      for(String eachFunctionName: allFunctionNames) {
+        for (ParamTypes params : paramArray) {
+          ParamOptionTypes[] paramOptionArray;
+          if(params.paramOptionTypes() == null ||
+              params.paramOptionTypes().getClass().getAnnotation(ParamTypes.class) == null) {
+            paramOptionArray = new ParamOptionTypes[0];
+          } else {
+            paramOptionArray = params.paramOptionTypes().getClass().getAnnotation(ParamTypes.class).paramOptionTypes();
+          }
+
+          Type[] paramTypes = params.paramTypes();
+          if (paramOptionArray.length > 0)
+            paramTypes = params.paramTypes().clone();
+
+          for (int i=0; i < paramOptionArray.length + 1; i++) {
+            FunctionDesc functionDesc = new FunctionDesc(eachFunctionName,
+                function.getClass(), function.getFunctionType(),
+                CatalogUtil.newSimpleDataType(returnType),
+                paramTypes.length == 0 ? CatalogUtil.newSimpleDataTypeArray() : CatalogUtil.newSimpleDataTypeArray(paramTypes));
+
+            functionDesc.setDescription(description);
+            functionDesc.setExample(example);
+            functionDesc.setDetail(detail);
+            sqlFuncs.add(functionDesc);
+
+            if (i != paramOptionArray.length) {
+              paramTypes = new Type[paramTypes.length +
+                  paramOptionArray[i].paramOptionTypes().length];
+              System.arraycopy(params.paramTypes(), 0, paramTypes, 0, paramTypes.length);
+              System.arraycopy(paramOptionArray[i].paramOptionTypes(), 0, paramTypes, paramTypes.length,
+                  paramOptionArray[i].paramOptionTypes().length);
+            }
+          }
+        }
+      }
+    }
+
+    return sqlFuncs;
+  }
+
+  public MasterContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    LOG.info("TajoMaster is starting up");
+
+    // check base tablespace and databases
+    checkBaseTBSpaceAndDatabase();
+
+    super.serviceStart();
+
+    // Setting the system global configs
+    systemConf.setSocketAddr(ConfVars.CATALOG_ADDRESS.varname,
+        NetUtils.getConnectAddress(catalogServer.getBindAddress()));
+
+    try {
+      writeSystemConf();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    initSystemMetrics();
+  }
+
+  private void writeSystemConf() throws IOException {
+    // Storing the system configs
+    Path systemConfPath = TajoConf.getSystemConfPath(systemConf);
+
+    if (!defaultFS.exists(systemConfPath.getParent())) {
+      defaultFS.mkdirs(systemConfPath.getParent());
+    }
+
+    if (defaultFS.exists(systemConfPath)) {
+      defaultFS.delete(systemConfPath, false);
+    }
+
+    FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath,
+        new FsPermission(SYSTEM_CONF_FILE_PERMISSION));
+    try {
+      systemConf.writeXml(out);
+    } finally {
+      out.close();
+    }
+    defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT));
+  }
+
+  private void checkBaseTBSpaceAndDatabase() throws IOException {
+    if (!catalog.existTablespace(DEFAULT_TABLESPACE_NAME)) {
+      catalog.createTablespace(DEFAULT_TABLESPACE_NAME, context.getConf().getVar(ConfVars.WAREHOUSE_DIR));
+    } else {
+      LOG.info(String.format("Default tablespace (%s) is already prepared.", DEFAULT_TABLESPACE_NAME));
+    }
+
+    if (!catalog.existDatabase(DEFAULT_DATABASE_NAME)) {
+      globalEngine.createDatabase(null, DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME, false);
+    } else {
+      LOG.info(String.format("Default database (%s) is already prepared.", DEFAULT_DATABASE_NAME));
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (webServer != null) {
+      try {
+        webServer.stop();
+      } catch (Exception e) {
+        LOG.error(e);
+      }
+    }
+
+    IOUtils.cleanup(LOG, catalogServer);
+
+    if(systemMetrics != null) {
+      systemMetrics.stop();
+    }
+
+    RpcChannelFactory.shutdown();
+
+    super.stop();
+    LOG.info("Tajo Master main thread exiting");
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public boolean isMasterRunning() {
+    return getServiceState() == STATE.STARTED;
+  }
+
+  public CatalogService getCatalog() {
+    return this.catalog;
+  }
+
+  public CatalogServer getCatalogServer() {
+    return this.catalogServer;
+  }
+
+  public AbstractStorageManager getStorageManager() {
+    return this.storeManager;
+  }
+
+  public class MasterContext {
+    private final TajoConf conf;
+
+    public MasterContext(TajoConf conf) {
+      this.conf = conf;
+    }
+
+    public TajoConf getConf() {
+      return conf;
+    }
+
+    public Clock getClock() {
+      return clock;
+    }
+
+    public QueryJobManager getQueryJobManager() {
+      return queryJobManager;
+    }
+
+    public WorkerResourceManager getResourceManager() {
+      return resourceManager;
+    }
+
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    public CatalogService getCatalog() {
+      return catalog;
+    }
+
+    public SessionManager getSessionManager() {
+      return sessionManager;
+    }
+
+    public GlobalEngine getGlobalEngine() {
+      return globalEngine;
+    }
+
+    public AbstractStorageManager getStorageManager() {
+      return storeManager;
+    }
+
+    public TajoMasterService getTajoMasterService() {
+      return tajoMasterService;
+    }
+
+    public TajoSystemMetrics getSystemMetrics() {
+      return systemMetrics;
+    }
+  }
+
+  String getThreadTaskName(long id, String name) {
+    if (name == null) {
+      return Long.toString(id);
+    }
+    return id + " (" + name + ")";
+  }
+
+  public void dumpThread(Writer writer) {
+    PrintWriter stream = new PrintWriter(writer);
+    int STACK_DEPTH = 20;
+    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+    long[] threadIds = threadBean.getAllThreadIds();
+    stream.println("Process Thread Dump: Tajo Worker");
+    stream.println(threadIds.length + " active threads");
+    for (long tid : threadIds) {
+      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+      if (info == null) {
+        stream.println("  Inactive");
+        continue;
+      }
+      stream.println("Thread " + getThreadTaskName(info.getThreadId(), info.getThreadName()) + ":");
+      Thread.State state = info.getThreadState();
+      stream.println("  State: " + state + ", Blocked count: " + info.getBlockedCount() +
+          ", Waited count: " + info.getWaitedCount());
+      if (contention) {
+        stream.println("  Blocked time: " + info.getBlockedTime() + ", Waited time: " + info.getWaitedTime());
+      }
+      if (state == Thread.State.WAITING) {
+        stream.println("  Waiting on " + info.getLockName());
+      } else if (state == Thread.State.BLOCKED) {
+        stream.println("  Blocked on " + info.getLockName() +
+            ", Blocked by " + getThreadTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
+      }
+      stream.println("  Stack:");
+      for (StackTraceElement frame : info.getStackTrace()) {
+        stream.println("    " + frame.toString());
+      }
+      stream.println("");
+    }
+  }
+
+  public static List<File> getMountPath() throws Exception {
+    BufferedReader mountOutput = null;
+    try {
+      Process mountProcess = Runtime.getRuntime ().exec("mount");
+      mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
+      List<File> mountPaths = new ArrayList<File>();
+      while (true) {
+        String line = mountOutput.readLine();
+        if (line == null) {
+          break;
+        }
+
+        int indexStart = line.indexOf(" on /");
+        int indexEnd = line.indexOf(" ", indexStart + 4);
+
+        mountPaths.add(new File(line.substring (indexStart + 4, indexEnd)));
+      }
+      return mountPaths;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    } finally {
+      if(mountOutput != null) {
+        mountOutput.close();
+      }
+    }
+  }
+  public static void main(String[] args) throws Exception {
+    StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
+
+    try {
+      TajoMaster master = new TajoMaster();
+      ShutdownHookManager.get().addShutdownHook(new CompositeServiceShutdownHook(master), SHUTDOWN_HOOK_PRIORITY);
+      TajoConf conf = new TajoConf(new YarnConfiguration());
+      master.init(conf);
+      master.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting TajoMaster", t);
+      System.exit(-1);
+    }
+  }
+}