You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/01/02 05:36:56 UTC

[1/2] TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)

Updated Branches:
  refs/heads/master 35b86177f -> df5727c49


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
new file mode 100644
index 0000000..10182a4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+public class FragmentScheduleEvent extends TaskSchedulerEvent {
+  private final FileFragment leftFragment;
+  private final FileFragment rightFragment;
+
+  public FragmentScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
+                               final FileFragment fragment) {
+    this(eventType, blockId, fragment, null);
+  }
+
+  public FragmentScheduleEvent(final EventType eventType,
+                               final ExecutionBlockId blockId,
+                               final FileFragment leftFragment,
+                               final FileFragment rightFragment) {
+    super(eventType, blockId);
+    this.leftFragment = leftFragment;
+    this.rightFragment = rightFragment;
+  }
+
+  public boolean hasRightFragment() {
+    return this.rightFragment != null;
+  }
+
+  public FileFragment getLeftFragment() {
+    return leftFragment;
+  }
+
+  public FileFragment getRightFragment() { return rightFragment; }
+
+  @Override
+  public String toString() {
+    return "FragmentScheduleEvent{" +
+        "leftFragment=" + leftFragment +
+        ", rightFragment=" + rightFragment +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
new file mode 100644
index 0000000..a2acc7e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryUnitAttemptScheduleEvent.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+
+public class QueryUnitAttemptScheduleEvent extends TaskSchedulerEvent {
+  private final QueryUnitAttemptScheduleContext context;
+  private final QueryUnitAttempt queryUnitAttempt;
+
+  public QueryUnitAttemptScheduleEvent(EventType eventType, ExecutionBlockId executionBlockId,
+                                       QueryUnitAttemptScheduleContext context, QueryUnitAttempt queryUnitAttempt) {
+    super(eventType, executionBlockId);
+    this.context = context;
+    this.queryUnitAttempt = queryUnitAttempt;
+  }
+
+  public QueryUnitAttempt getQueryUnitAttempt() {
+    return queryUnitAttempt;
+  }
+
+  public QueryUnitAttemptScheduleContext getContext() {
+    return context;
+  }
+
+  public static class QueryUnitAttemptScheduleContext {
+    private ContainerId containerId;
+    private String host;
+    private RpcCallback<QueryUnitRequestProto> callback;
+
+    public QueryUnitAttemptScheduleContext() {
+
+    }
+
+    public QueryUnitAttemptScheduleContext(ContainerId containerId,
+                                           String host,
+                                           RpcCallback<QueryUnitRequestProto> callback) {
+      this.containerId = containerId;
+      this.host = host;
+      this.callback = callback;
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public void setContainerId(ContainerId containerId) {
+      this.containerId = containerId;
+    }
+
+    public String getHost() {
+      return host;
+    }
+
+    public void setHost(String host) {
+      this.host = host;
+    }
+
+    public RpcCallback<QueryUnitRequestProto> getCallback() {
+      return callback;
+    }
+
+    public void setCallback(RpcCallback<QueryUnitRequestProto> callback) {
+      this.callback = callback;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
index 007a976..9fe2f8c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -30,9 +30,9 @@ public abstract class TaskSchedulerEvent extends AbstractEvent<EventType> {
 
   protected final ExecutionBlockId executionBlockId;
 
-  public TaskSchedulerEvent(EventType eventType, ExecutionBlockId queryBlockId) {
+  public TaskSchedulerEvent(EventType eventType, ExecutionBlockId executionBlockId) {
     super(eventType);
-    this.executionBlockId = queryBlockId;
+    this.executionBlockId = executionBlockId;
   }
 
   public ExecutionBlockId getExecutionBlockId() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java
deleted file mode 100644
index e7bca57..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEventFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.master.TaskSchedulerFactory;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryUnitAttempt;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-public class TaskSchedulerEventFactory {
-  private static final Map<String, Class<? extends TaskSchedulerEvent>> CACHED_EVENT_CLASSES = Maps.newConcurrentMap();
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
-  private static final Class<?>[] DEFAULT_EVENT_PARAMS = { EventType.class, QueryUnitAttempt.class };
-
-  public static <T extends TaskSchedulerEvent> T getTaskSchedulerEvent(Configuration conf, QueryUnitAttempt attempt, EventType eventType) {
-    T result;
-
-    try {
-      Class<T> eventClass = (Class<T>) getTaskSchedulerEventClass(conf);
-      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(eventClass);
-      if (constructor == null) {
-        constructor = eventClass.getDeclaredConstructor(DEFAULT_EVENT_PARAMS);
-        constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(eventClass, constructor);
-      }
-      result = constructor.newInstance(new Object[]{ eventType, attempt });
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return result;
-  }
-
-  public static Class<? extends TaskSchedulerEvent> getTaskSchedulerEventClass(Configuration conf) throws IOException {
-    String handlerName = TaskSchedulerFactory.getSchedulerType(conf);
-    Class<? extends TaskSchedulerEvent> eventClass = CACHED_EVENT_CLASSES.get(handlerName);
-    if (eventClass == null) {
-      eventClass = conf.getClass(String.format("tajo.querymaster.task-schedule-event.%s.class", handlerName), null, TaskSchedulerEvent.class);
-      CACHED_EVENT_CLASSES.put(handlerName, eventClass);
-    }
-
-    if (eventClass == null) {
-      throw new IOException("Unknown Event Type: " + handlerName);
-    }
-    return eventClass;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index a75548b..3618d3b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -33,7 +33,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.DefaultTaskScheduler;
+import org.apache.tajo.master.LazyTaskScheduler;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -129,7 +129,7 @@ public class QueryMasterManagerService extends CompositeService
 
       if(queryMasterTask == null || queryMasterTask.isStopped()) {
         LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
-        done.run(DefaultTaskScheduler.stopTaskRunnerReq);
+        done.run(LazyTaskScheduler.stopTaskRunnerReq);
       } else {
         LOG.debug("getTask:" + cid + ", ebId:" + ebId);
         queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 98aa64f..5e7c82f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
@@ -32,8 +33,10 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
+import org.apache.tajo.master.FragmentPair;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.TajoIdUtils;
@@ -59,18 +62,18 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	private LogicalNode plan = null;
 	private List<ScanNode> scan;
 	
-	private Map<String, FragmentProto> fragMap;
+	private Map<String, Set<FragmentProto>> fragMap;
 	private Map<String, Set<URI>> fetchMap;
 	
   private List<Partition> partitions;
 	private TableStats stats;
-  private List<DataLocation> dataLocations;
   private final boolean isLeafTask;
   private List<IntermediateEntry> intermediateData;
 
   private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
   private final int maxAttempts = 3;
-  private Integer lastAttemptId;
+  private Integer nextAttempt = -1;
+  private QueryUnitAttemptId lastAttemptId;
 
   private QueryUnitAttemptId successfulAttempt;
   private String succeededHost;
@@ -82,6 +85,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   private long launchTime;
   private long finishTime;
 
+  private List<DataLocation> dataLocations = Lists.newArrayList();
+
   protected static final StateMachineFactory
       <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
       new StateMachineFactory
@@ -111,9 +116,10 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
   private final Lock readLock;
   private final Lock writeLock;
+  private QueryUnitAttemptScheduleContext scheduleContext;
 
-	public QueryUnit(Configuration conf, QueryUnitId id,
-                   boolean isLeafTask, EventHandler eventHandler) {
+	public QueryUnit(Configuration conf, QueryUnitAttemptScheduleContext scheduleContext,
+                   QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
     this.systemConf = conf;
 		this.taskId = id;
     this.eventHandler = eventHandler;
@@ -123,12 +129,14 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     fragMap = Maps.newHashMap();
     partitions = new ArrayList<Partition>();
     attempts = Collections.emptyMap();
-    lastAttemptId = -1;
+    lastAttemptId = null;
+    nextAttempt = -1;
     failedAttempts = 0;
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
+    this.scheduleContext = scheduleContext;
 
     stateMachine = stateMachineFactory.make(this);
 	}
@@ -137,20 +145,6 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     return this.isLeafTask;
   }
 
-  public void setDataLocations(FileFragment fragment) {
-    String[] hosts = fragment.getHosts();
-    int[] volumeIds = fragment.getDiskIds();
-    this.dataLocations = new ArrayList<DataLocation>(hosts.length);
-
-    for (int i = 0; i < hosts.length; i++) {
-      this.dataLocations.add(new DataLocation(hosts[i], volumeIds[i]));
-    }
-  }
-
-  public List<DataLocation> getDataLocations() {
-    return this.dataLocations;
-  }
-
   public TaskState getState() {
     readLock.lock();
     try {
@@ -183,15 +177,50 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	  }
 	}
 
+  private void addDataLocation(FileFragment fragment) {
+    String[] hosts = fragment.getHosts();
+    int[] diskIds = fragment.getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      dataLocations.add(new DataLocation(hosts[i], diskIds[i]));
+    }
+  }
+
   @Deprecated
   public void setFragment(String tableId, FileFragment fragment) {
-    this.fragMap.put(tableId, fragment.getProto());
-    setDataLocations(fragment);
+    Set<FragmentProto> fragmentProtos;
+    if (fragMap.containsKey(tableId)) {
+      fragmentProtos = fragMap.get(tableId);
+    } else {
+      fragmentProtos = new HashSet<FragmentProto>();
+      fragMap.put(tableId, fragmentProtos);
+    }
+    fragmentProtos.add(fragment.getProto());
+    addDataLocation(fragment);
   }
 
   public void setFragment2(FileFragment fragment) {
-    this.fragMap.put(fragment.getTableName(), fragment.getProto());
-    setDataLocations(fragment);
+    Set<FragmentProto> fragmentProtos;
+    if (fragMap.containsKey(fragment.getTableName())) {
+      fragmentProtos = fragMap.get(fragment.getTableName());
+    } else {
+      fragmentProtos = new HashSet<FragmentProto>();
+      fragMap.put(fragment.getTableName(), fragmentProtos);
+    }
+    fragmentProtos.add(fragment.getProto());
+    addDataLocation(fragment);
+  }
+
+  public void setFragment(FragmentPair[] fragmentPairs) {
+    for (FragmentPair eachFragmentPair : fragmentPairs) {
+      this.setFragment2(eachFragmentPair.getLeftFragment());
+      if (eachFragmentPair.getRightFragment() != null) {
+        this.setFragment2(eachFragmentPair.getRightFragment());
+      }
+    }
+  }
+
+  public DataLocation[] getDataLocations() {
+    return dataLocations.toArray(new DataLocation[dataLocations.size()]);
   }
 
   public String getSucceededHost() {
@@ -230,7 +259,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	}
 
   public Collection<FragmentProto> getAllFragments() {
-    return fragMap.values();
+    Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>();
+    for (Set<FragmentProto> eachFragmentSet : fragMap.values()) {
+      fragmentProtos.addAll(eachFragmentSet);
+    }
+    return fragmentProtos;
   }
 	
 	public LogicalNode getLogicalPlan() {
@@ -276,9 +309,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	@Override
 	public String toString() {
 		String str = new String(plan.getType() + " \n");
-		for (Entry<String, FragmentProto> e : fragMap.entrySet()) {
+		for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
 		  str += e.getKey() + " : ";
-      str += e.getValue() + " ";
+      for (FragmentProto fragment : e.getValue()) {
+        str += fragment + ", ";
+      }
 		}
 		for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
       str += e.getKey() + " : ";
@@ -311,8 +346,10 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	}
 
   public QueryUnitAttempt newAttempt() {
-    QueryUnitAttempt attempt = new QueryUnitAttempt(QueryIdFactory.newQueryUnitAttemptId(
-        this.getId(), ++lastAttemptId), this, eventHandler);
+    QueryUnitAttempt attempt = new QueryUnitAttempt(scheduleContext,
+        QueryIdFactory.newQueryUnitAttemptId(this.getId(), ++nextAttempt),
+        this, eventHandler);
+    lastAttemptId = attempt.getId();
     return attempt;
   }
 
@@ -341,7 +378,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   }
 
   public int getRetryCount () {
-    return this.lastAttemptId;
+    return this.nextAttempt;
   }
 
   private static class InitialScheduleTransition implements

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 1af7332..0c50704 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -28,6 +28,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.util.TajoIdUtils;
@@ -58,6 +59,8 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
 
   private final List<String> diagnostics = new ArrayList<String>();
 
+  private final QueryUnitAttemptScheduleContext scheduleContext;
+
   protected static final StateMachineFactory
       <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
       stateMachineFactory = new StateMachineFactory
@@ -109,8 +112,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     stateMachine;
 
 
-  public QueryUnitAttempt(final QueryUnitAttemptId id, final QueryUnit queryUnit,
+  public QueryUnitAttempt(final QueryUnitAttemptScheduleContext scheduleContext,
+                          final QueryUnitAttemptId id, final QueryUnit queryUnit,
                           final EventHandler eventHandler) {
+    this.scheduleContext = scheduleContext;
     this.id = id;
     this.expire = QueryUnitAttempt.EXPIRE_TIME;
     this.queryUnit = queryUnit;
@@ -203,9 +208,9 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     @Override
     public void transition(QueryUnitAttempt taskAttempt,
                            TaskAttemptEvent taskAttemptEvent) {
-      TaskAttemptScheduleEvent castEvent = (TaskAttemptScheduleEvent) taskAttemptEvent;
-      taskAttempt.eventHandler.handle(
-          TaskSchedulerEventFactory.getTaskSchedulerEvent(castEvent.getConf(), taskAttempt, EventType.T_SCHEDULE));
+      taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
+          EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index ac59408..2185acf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -19,12 +19,11 @@
 package org.apache.tajo.master.querymaster;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -35,9 +34,13 @@ import org.apache.tajo.engine.planner.UniformRangePartition;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.master.TaskSchedulerContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.TupleRange;
@@ -64,8 +67,9 @@ public class Repartitioner {
   private static final Log LOG = LogFactory.getLog(Repartitioner.class);
 
   private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
+  private final static String UNKNOWN_HOST = "unknown";
 
-  public static QueryUnit[] createJoinTasks(SubQuery subQuery)
+  public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery)
       throws IOException {
     MasterPlan masterPlan = subQuery.getMasterPlan();
     ExecutionBlock execBlock = subQuery.getBlock();
@@ -89,7 +93,7 @@ public class Repartitioner {
 
         tablePath = storageManager.getTablePath(scans[i].getTableName());
         stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
-        fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0);
+        fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
       } else {
         tablePath = tableDesc.getPath();
         stats[i] = tableDesc.getStats();
@@ -99,19 +103,13 @@ public class Repartitioner {
     }
 
     // Assigning either fragments or fetch urls to query units
-    QueryUnit [] tasks;
     boolean leftSmall = execBlock.isBroadcastTable(scans[0].getCanonicalName());
     boolean rightSmall = execBlock.isBroadcastTable(scans[1].getCanonicalName());
 
     if (leftSmall && rightSmall) {
       LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on Single Machine");
-      tasks = new QueryUnit[1];
-      tasks[0] = new QueryUnit(subQuery.getContext().getConf(),
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
-          subQuery.getMasterPlan().isLeaf(execBlock), subQuery.getEventHandler());
-      tasks[0].setLogicalPlan(execBlock.getPlan());
-      tasks[0].setFragment(scans[0].getCanonicalName(), fragments[0]);
-      tasks[0].setFragment(scans[1].getCanonicalName(), fragments[1]);
+      SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
+      schedulerContext.setEstimatedTaskNum(1);
     } else if (leftSmall ^ rightSmall) {
       LOG.info("[Distributed Join Strategy] : Broadcast Join");
       int broadcastIdx = leftSmall ? 0 : 1;
@@ -120,7 +118,7 @@ public class Repartitioner {
       LOG.info("Broadcasting Table Volume: " + stats[broadcastIdx].getNumBytes());
       LOG.info("Base Table Volume: " + stats[baseScanIdx].getNumBytes());
 
-      tasks = createLeafTasksWithBroadcastTable(subQuery, baseScanIdx, fragments[broadcastIdx]);
+      scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments[broadcastIdx]);
     } else {
       LOG.info("[Distributed Join Strategy] : Repartition Join");
       // The hash map is modeling as follows:
@@ -156,6 +154,11 @@ public class Repartitioner {
       LOG.info("Outer Intermediate Volume: " + stats[0].getNumBytes());
       LOG.info("Inner Intermediate Volume: " + stats[1].getNumBytes());
 
+      int [] avgSize = new int[2];
+      avgSize[0] = (int) (stats[0].getNumBytes() / hashEntries.size());
+      avgSize[1] = (int) (stats[1].getNumBytes() / hashEntries.size());
+      int bothFetchSize = avgSize[0] + avgSize[1];
+
       // Getting the desire number of join tasks according to the volumn
       // of a larger table
       int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
@@ -173,33 +176,22 @@ public class Repartitioner {
       // distinct partition ids.
       int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
       LOG.info("The determined number of join tasks is " + joinTaskNum);
-      QueryUnit [] createdTasks = newEmptyJoinTask(subQuery, fragments, joinTaskNum);
+
+      SubQuery.scheduleFragment(subQuery, fragments[0], fragments[1]);
 
       // Assign partitions to tasks in a round robin manner.
-      int i = 0;
       for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
           : hashEntries.entrySet()) {
-        addJoinPartition(createdTasks[i++], subQuery, entry.getKey(), entry.getValue());
-        if (i >= joinTaskNum) {
-          i = 0;
-        }
+        addJoinPartition(subQuery, entry.getKey(), entry.getValue());
       }
 
-      List<QueryUnit> filteredTasks = new ArrayList<QueryUnit>();
-      for (QueryUnit task : createdTasks) {
-        // if there are at least two fetches, the join is possible.
-        if (task.getFetches().size() > 1) {
-          filteredTasks.add(task);
-        }
-      }
-
-      tasks = filteredTasks.toArray(new QueryUnit[filteredTasks.size()]);
+      schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum));
+      schedulerContext.setEstimatedTaskNum(joinTaskNum);
     }
-
-    return tasks;
   }
 
-  private static QueryUnit [] createLeafTasksWithBroadcastTable(SubQuery subQuery, int baseScanId, FileFragment broadcasted) throws IOException {
+  private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
+                                                          int baseScanId, FileFragment broadcasted) throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
     ScanNode[] scans = execBlock.getScanNodes();
     Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
@@ -210,50 +202,16 @@ public class Repartitioner {
     inputPath = desc.getPath();
     meta = desc.getMeta();
 
-    FileSystem fs = inputPath.getFileSystem(subQuery.getContext().getConf());
     List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
         inputPath);
-    QueryUnit queryUnit;
-    List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-
-    int i = 0;
-    for (FileFragment fragment : fragments) {
-      queryUnit = newQueryUnit(subQuery, i++, fragment);
-      queryUnit.setFragment2(broadcasted);
-      queryUnits.add(queryUnit);
-    }
-    return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
-  }
 
-  private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    QueryUnit unit = new QueryUnit(subQuery.getContext().getConf(),
-        QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.getMasterPlan().isLeaf(execBlock),
-        subQuery.getEventHandler());
-    unit.setLogicalPlan(execBlock.getPlan());
-    unit.setFragment2(fragment);
-    return unit;
+    SubQuery.scheduleFragments(subQuery, fragments, broadcasted);
+    schedulerContext.setEstimatedTaskNum(fragments.size());
   }
 
-  private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, FileFragment[] fragments, int taskNum) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    QueryUnit [] tasks = new QueryUnit[taskNum];
-    for (int i = 0; i < taskNum; i++) {
-      tasks[i] = new QueryUnit(subQuery.getContext().getConf(),
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), i), subQuery.getMasterPlan().isLeaf(execBlock),
-          subQuery.getEventHandler());
-      tasks[i].setLogicalPlan(execBlock.getPlan());
-      for (FileFragment fragment : fragments) {
-        tasks[i].setFragment2(fragment);
-      }
-    }
-
-    return tasks;
-  }
-
-  private static void addJoinPartition(QueryUnit task, SubQuery subQuery, int partitionId,
+  private static void addJoinPartition(SubQuery subQuery, int partitionId,
                                        Map<String, List<IntermediateEntry>> grouppedPartitions) {
-
+    Map<String, List<URI>> fetches = new HashMap<String, List<URI>>();
     for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) {
       Map<String, List<IntermediateEntry>> requests;
       if (grouppedPartitions.containsKey(execBlock.getId().toString())) {
@@ -269,8 +227,9 @@ public class Repartitioner {
             requestPerNode.getValue());
         fetchURIs.addAll(uris);
       }
-      task.addFetches(execBlock.getId().toString(), fetchURIs);
+      fetches.put(execBlock.getId().toString(), Lists.newArrayList(fetchURIs));
     }
+    SubQuery.scheduleFetches(subQuery, fetches);
   }
 
   /**
@@ -294,25 +253,26 @@ public class Repartitioner {
     return mergedPartitions;
   }
 
-  public static QueryUnit [] createNonLeafTask(MasterPlan masterPlan, SubQuery subQuery, SubQuery childSubQuery,
-                                               DataChannel channel, int maxNum)
+  public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
+                                                      MasterPlan masterPlan, SubQuery subQuery, SubQuery childSubQuery,
+                                                      DataChannel channel, int maxNum)
       throws InternalException {
     if (channel.getPartitionType() == HASH_PARTITION) {
-      return createHashPartitionedTasks(masterPlan, subQuery, childSubQuery, channel, maxNum);
+      scheduleHashPartitionedFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
     } else if (channel.getPartitionType() == RANGE_PARTITION) {
-      return createRangePartitionedTasks(subQuery, childSubQuery, channel, maxNum);
+      scheduleRangePartitionedFetches(schedulerContext, subQuery, childSubQuery, channel, maxNum);
     } else {
       throw new InternalException("Cannot support partition type");
     }
   }
 
-  public static QueryUnit [] createRangePartitionedTasks(SubQuery subQuery,
-                                                         SubQuery childSubQuery, DataChannel channel, int maxNum)
+  public static void scheduleRangePartitionedFetches(TaskSchedulerContext schedulerContext, SubQuery subQuery,
+                                                     SubQuery childSubQuery, DataChannel channel, int maxNum)
       throws InternalException {
     ExecutionBlock execBlock = subQuery.getBlock();
     TableStats stat = childSubQuery.getTableStat();
     if (stat.getNumRows() == 0) {
-      return new QueryUnit[0];
+      return;
     }
 
     ScanNode scan = execBlock.getScanNodes()[0];
@@ -343,7 +303,8 @@ public class Repartitioner {
         " sub ranges (total units: " + determinedTaskNum + ")");
     TupleRange [] ranges = partitioner.partition(determinedTaskNum);
 
-    FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0);
+    FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+    SubQuery.scheduleFragment(subQuery, dummyFragment);
 
     List<String> basicFetchURIs = new ArrayList<String>();
 
@@ -380,27 +341,27 @@ public class Repartitioner {
       LOG.error(e);
     }
 
-    QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, dummyFragment);
-    assignPartitionByRoundRobin(map, scan.getTableName(), tasks);
-    return tasks;
+    schedulePartitionByRoundRobin(subQuery, map, scan.getTableName(), determinedTaskNum);
+
+    schedulerContext.setEstimatedTaskNum(determinedTaskNum);
   }
 
-  public static QueryUnit [] assignPartitionByRoundRobin(Map<?, Set<URI>> partitions,
-                                               String tableName, QueryUnit [] tasks) {
-    int tid = 0;
+  public static void schedulePartitionByRoundRobin(SubQuery subQuery, Map<?, Set<URI>> partitions,
+                                                   String tableName, int num) {
+    int i;
+    Map<String, List<URI>>[] fetchesArray = new Map[num];
+    for (i = 0; i < num; i++) {
+      fetchesArray[i] = new HashMap<String, List<URI>>();
+    }
+    i = 0;
     for (Entry<?, Set<URI>> entry : partitions.entrySet()) {
-      for (URI uri : entry.getValue()) {
-        tasks[tid].addFetch(tableName, uri);
-      }
-
-      if (tid >= tasks.length) {
-        tid = 0;
-      } else {
-        tid ++;
-      }
+      Set<URI> value = entry.getValue();
+      fetchesArray[i++].put(tableName, Lists.newArrayList(value));
+      if (i == num) i = 0;
+    }
+    for (Map<String, List<URI>> eachFetches : fetchesArray) {
+      SubQuery.scheduleFetches(subQuery, eachFetches);
     }
-
-    return tasks;
   }
 
   public static String createBasicFetchUri(String hostName, int port,
@@ -418,8 +379,9 @@ public class Repartitioner {
     return sb.toString();
   }
 
-  public static QueryUnit [] createHashPartitionedTasks(MasterPlan masterPlan, SubQuery subQuery,
-                                                 SubQuery childSubQuery, DataChannel channel, int maxNum) {
+  public static void scheduleHashPartitionedFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
+                                                    SubQuery subQuery, DataChannel channel,
+                                                    int maxNum) {
     ExecutionBlock execBlock = subQuery.getBlock();
 
     List<TableStats> tableStatses = new ArrayList<TableStats>();
@@ -431,16 +393,17 @@ public class Repartitioner {
     TableStats totalStat = StatisticsUtil.computeStatFromUnionBlock(tableStatses);
 
     if (totalStat.getNumRows() == 0) {
-      return new QueryUnit[0];
+      return;
     }
 
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
     tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
 
-
-    FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0);
-
+    FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+    List<FileFragment> fragments = new ArrayList<FileFragment>();
+    fragments.add(frag);
+    SubQuery.scheduleFragments(subQuery, fragments);
 
     Map<String, List<IntermediateEntry>> hashedByHost;
     Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
@@ -475,22 +438,14 @@ public class Repartitioner {
       determinedTaskNum = 1;
     }
 
-    QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, frag);
-
-    int tid = 0;
     for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
-      for (URI uri : entry.getValue()) {
-        tasks[tid].addFetch(scan.getTableName(), uri);
-      }
-
-      tid ++;
-
-      if (tid == tasks.length) {
-       tid = 0;
-      }
+      List<URI> value = entry.getValue();
+      Map<String, List<URI>> fetches = new HashMap<String, List<URI>>();
+      fetches.put(scan.getTableName(), value);
+      SubQuery.scheduleFetches(subQuery, fetches);
     }
 
-    return tasks;
+    schedulerContext.setEstimatedTaskNum(determinedTaskNum);
   }
 
   public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
@@ -563,20 +518,6 @@ public class Repartitioner {
     return hashed;
   }
 
-  public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
-                                                     FileFragment frag) {
-    LogicalNode plan = subQuery.getBlock().getPlan();
-    QueryUnit [] tasks = new QueryUnit[num];
-    for (int i = 0; i < num; i++) {
-      tasks[i] = new QueryUnit(subQuery.getContext().getConf(),
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
-          false, subQuery.getEventHandler());
-      tasks[i].setFragment2(frag);
-      tasks[i].setLogicalPlan(plan);
-    }
-    return tasks;
-  }
-
   public static Map<String, List<IntermediateEntry>> hashByHost(
       List<IntermediateEntry> entries) {
     Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 03659c3..5979fbc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -45,15 +45,15 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.AbstractTaskScheduler;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.TaskSchedulerFactory;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
@@ -201,6 +201,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private final Lock writeLock;
 
   private int completedTaskCount = 0;
+  private TaskSchedulerContext schedulerContext;
 
   public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
     this.context = context;
@@ -267,7 +268,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         if (completedTaskCount == 0) {
           return 0.0f;
         } else {
-          return (float)completedTaskCount / (float)tasks.size();
+          return (float)completedTaskCount / (float)schedulerContext.getEstimatedTaskNum();
         }
       }
     } finally {
@@ -511,13 +512,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
           DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
           setRepartitionIfNecessary(subQuery, channel);
-          createTasks(subQuery);
+          initTaskScheduler(subQuery);
+          schedule(subQuery);
+          LOG.info(subQuery.getTaskScheduler().remainingScheduledObjectNum() + " objects are scheduled");
 
-          if (subQuery.tasks.size() == 0) { // if there is no tasks
+          if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+            subQuery.stopScheduler();
             subQuery.finish();
             return SubQueryState.SUCCEEDED;
           } else {
-            initTaskScheduler(subQuery);
+            subQuery.taskScheduler.start();
             allocateContainers(subQuery);
             return SubQueryState.INIT;
           }
@@ -533,10 +537,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       return state;
     }
 
-    private void initTaskScheduler(SubQuery subQuery) {
-      subQuery.taskScheduler = TaskSchedulerFactory.getTaskSCheduler(subQuery.context.getConf(), subQuery.context);
-      subQuery.taskScheduler.init(subQuery.context.getConf());
-      subQuery.taskScheduler.start();
+    private void initTaskScheduler(SubQuery subQuery) throws IOException {
+      TajoConf conf = subQuery.context.getConf();
+      subQuery.schedulerContext = new TaskSchedulerContext(subQuery.context,
+          subQuery.getMasterPlan().isLeaf(subQuery.getId()), subQuery.getId());
+      subQuery.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
+      subQuery.taskScheduler = TaskSchedulerFactory.get(conf, subQuery.schedulerContext, subQuery);
+      subQuery.taskScheduler.init(conf);
+      LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling");
     }
 
     /**
@@ -649,28 +657,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       }
     }
 
-    private static void createTasks(SubQuery subQuery) throws IOException {
+    private static void schedule(SubQuery subQuery) throws IOException {
       MasterPlan masterPlan = subQuery.getMasterPlan();
       ExecutionBlock execBlock = subQuery.getBlock();
-      QueryUnit [] tasks;
       if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
-        tasks = createLeafTasks(subQuery);
-
+        scheduleFragmentsForLeafQuery(subQuery);
       } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
-        tasks = Repartitioner.createJoinTasks(subQuery);
-
+        Repartitioner.scheduleFragmentsForJoinQuery(subQuery.schedulerContext, subQuery);
       } else { // Case 3: Others (Sort or Aggregation)
         int numTasks = getNonLeafTaskNum(subQuery);
         ExecutionBlockId childId = masterPlan.getChilds(subQuery.getBlock()).get(0).getId();
         SubQuery child = subQuery.context.getSubQuery(childId);
         DataChannel channel = masterPlan.getChannel(child.getId(), subQuery.getId());
-        tasks = Repartitioner.createNonLeafTask(masterPlan, subQuery, child, channel, numTasks);
-      }
-
-      LOG.info("Create " + tasks.length + " Tasks");
-
-      for (QueryUnit task : tasks) {
-        subQuery.addTask(task);
+        Repartitioner.scheduleFragmentsForNonLeafTasks(subQuery.schedulerContext, masterPlan, subQuery, child,
+            channel, numTasks);
       }
     }
 
@@ -716,14 +716,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
     public static void allocateContainers(SubQuery subQuery) {
       ExecutionBlock execBlock = subQuery.getBlock();
-      QueryUnit [] tasks = subQuery.getQueryUnits();
 
       //TODO consider disk slot
       int requiredMemoryMBPerTask = 512;
 
       int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
           subQuery.getContext().getQueryMasterContext().getWorkerContext(),
-          tasks.length,
+          subQuery.schedulerContext.getEstimatedTaskNum(),
           requiredMemoryMBPerTask
       );
 
@@ -758,7 +757,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       return fragments;
     }
 
-    private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
+    private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOException {
       ExecutionBlock execBlock = subQuery.getBlock();
       ScanNode[] scans = execBlock.getScanNodes();
       Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
@@ -779,29 +778,55 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
       }
 
-      QueryUnit queryUnit;
-      List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-
-      int i = 0;
-      for (FileFragment fragment : fragments) {
-        queryUnit = newQueryUnit(subQuery, i++, fragment);
-        queryUnits.add(queryUnit);
-      }
+      SubQuery.scheduleFragments(subQuery, fragments);
+      int estimatedTaskNum = (int) Math.ceil((double)table.getStats().getNumBytes() /
+          (double)subQuery.schedulerContext.getTaskSize());
+      subQuery.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
+    }
+  }
 
-      return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
+  public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> fragments) {
+    for (FileFragment eachFragment : fragments) {
+      scheduleFragment(subQuery, eachFragment);
     }
+  }
 
-    private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, FileFragment fragment) {
-      ExecutionBlock execBlock = subQuery.getBlock();
-      QueryUnit unit = new QueryUnit(subQuery.context.getConf(),
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.masterPlan.isLeaf(execBlock),
-          subQuery.eventHandler);
-      unit.setLogicalPlan(execBlock.getPlan());
-      unit.setFragment2(fragment);
-      return unit;
+  public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) {
+    subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        subQuery.getId(), fragment));
+  }
+
+  public static void scheduleFragments(SubQuery subQuery, List<FileFragment> leftFragments,
+                                       FileFragment broadcastFragment) {
+    for (FileFragment eachLeafFragment : leftFragments) {
+      scheduleFragment(subQuery, eachLeafFragment, broadcastFragment);
     }
   }
 
+  public static void scheduleFragment(SubQuery subQuery,
+                                      FileFragment leftFragment, FileFragment rightFragment) {
+    subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        subQuery.getId(), leftFragment, rightFragment));
+  }
+
+  public static void scheduleFetches(SubQuery subQuery, Map<String, List<URI>> fetches) {
+    subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        subQuery.getId(), fetches));
+  }
+
+  public static QueryUnit newEmptyQueryUnit(TaskSchedulerContext schedulerContext,
+                                            QueryUnitAttemptScheduleContext queryUnitContext,
+                                            SubQuery subQuery, int taskId) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    QueryUnit unit = new QueryUnit(schedulerContext.getMasterContext().getConf(),
+        queryUnitContext,
+        QueryIdFactory.newQueryUnitId(schedulerContext.getBlockId(), taskId),
+        schedulerContext.isLeafQuery(), subQuery.eventHandler);
+    unit.setLogicalPlan(execBlock.getPlan());
+    subQuery.addTask(unit);
+    return unit;
+  }
+
   int i = 0;
   private static class ContainerLaunchTransition
       implements SingleArcTransition<SubQuery, SubQueryEvent> {
@@ -840,10 +865,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
                            SubQueryEvent subQueryEvent) {
       // schedule tasks
       try {
-        for (QueryUnitId taskId : subQuery.tasks.keySet()) {
-          subQuery.eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
-        }
-
         return  SubQueryState.RUNNING;
       } catch (Exception e) {
         LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
@@ -866,8 +887,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
       } else {
         LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/"
-            + subQuery.tasks.size() + " on " + task.getHost() + ":" + task.getPort());
-        if (subQuery.completedTaskCount == subQuery.tasks.size()) {
+            + subQuery.schedulerContext.getEstimatedTaskNum() + " on " + task.getHost() + ":" + task.getPort());
+        if (subQuery.taskScheduler.remainingScheduledObjectNum() == 0
+            && subQuery.getQueryUnits().length == subQuery.completedTaskCount) {
           subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
               SubQueryEventType.SQ_SUBQUERY_COMPLETED));
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index fb87f97..a1c383a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -372,14 +372,11 @@ public class Task {
         }
         this.executor.close();
       }
-      context.setState(TaskAttemptState.TA_SUCCEEDED);
     } catch (Exception e) {
       // errorMessage will be sent to master.
       errorMessage = ExceptionUtils.getStackTrace(e);
       LOG.error(errorMessage);
       aborted = true;
-
-      context.setState(TaskAttemptState.TA_FAILED);
     } finally {
       setProgressFlag();
       stopped = true;
@@ -387,6 +384,11 @@ public class Task {
 
       if (killed || aborted) {
         context.setProgress(0.0f);
+        if(killed) {
+          context.setState(TaskAttemptState.TA_KILLED);
+        } else {
+          context.setState(TaskAttemptState.TA_FAILED);
+        }
 
         TaskFatalErrorReport.Builder errorBuilder =
             TaskFatalErrorReport.newBuilder()
@@ -408,6 +410,7 @@ public class Task {
       } else {
         // if successful
         context.setProgress(1.0f);
+        context.setState(TaskAttemptState.TA_SUCCEEDED);
 
         // stopping the status report
         try {
@@ -421,9 +424,6 @@ public class Task {
         succeeded++;
       }
 
-      if(killed) {
-        context.setState(TaskAttemptState.TA_KILLED);
-      }
       finishTime = System.currentTimeMillis();
 
       cleanupTask();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index 2fd04ba..c49e8e5 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -37,28 +37,14 @@
     <description></description>
   </property>
 
-  <!--- Registered Scheduler Handler -->
   <property>
-    <name>tajo.querymaster.scheduler-handler</name>
-    <value>default</value>
-  </property>
-
-  <!--- Scheduler Configuration -->
-  <property>
-    <name>tajo.querymaster.scheduler-handler.type</name>
-    <value>default</value>
-  </property>
-
-  <!--- Scheduler Handler -->
-  <property>
-    <name>tajo.querymaster.scheduler-handler.default.class</name>
+    <name>tajo.querymaster.task-scheduler</name>
     <value>org.apache.tajo.master.DefaultTaskScheduler</value>
   </property>
 
-  <!-- Scheduler Event handler -->
   <property>
-    <name>tajo.querymaster.task-schedule-event.default.class</name>
-    <value>org.apache.tajo.master.event.DefaultTaskSchedulerEvent</value>
+    <name>tajo.querymaster.lazy-task-scheduler.algorithm</name>
+    <value>org.apache.tajo.master.GreedyFragmentScheduleAlgorithm</value>
   </property>
 
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 7082005..e4439f3 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -29,6 +29,7 @@ import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -60,6 +61,7 @@ public class MergeScanner implements Scanner {
     for (Fragment f : rawFragmentList) {
       fragments.add((FileFragment) f);
     }
+    Collections.sort(fragments);
 
     this.target = target;
     this.reset();


[2/2] git commit: TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)

Posted by ji...@apache.org.
TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/df5727c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/df5727c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/df5727c4

Branch: refs/heads/master
Commit: df5727c49edd2e726b99dfa49dcc6b3d5b6f252b
Parents: 35b8617
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Jan 2 13:34:36 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Jan 2 13:36:09 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../tajo/engine/query/QueryUnitRequestImpl.java |   1 -
 .../tajo/master/AbstractTaskScheduler.java      |   1 +
 .../DefaultFragmentScheduleAlgorithm.java       | 247 +++++++++
 .../tajo/master/DefaultTaskScheduler.java       | 148 +++---
 .../apache/tajo/master/FetchScheduleEvent.java  |  40 ++
 .../org/apache/tajo/master/FragmentPair.java    |  73 +++
 .../tajo/master/FragmentScheduleAlgorithm.java  |  38 ++
 .../FragmentScheduleAlgorithmFactory.java       |  68 +++
 .../master/GreedyFragmentScheduleAlgorithm.java | 421 +++++++++++++++
 .../apache/tajo/master/LazyTaskScheduler.java   | 512 +++++++++++++++++++
 .../apache/tajo/master/ScheduledFetches.java    |  47 ++
 .../tajo/master/TaskSchedulerContext.java       |  68 +++
 .../tajo/master/TaskSchedulerFactory.java       |  55 +-
 .../master/event/DefaultTaskSchedulerEvent.java |  91 ----
 .../master/event/FragmentScheduleEvent.java     |  59 +++
 .../event/QueryUnitAttemptScheduleEvent.java    |  87 ++++
 .../tajo/master/event/TaskSchedulerEvent.java   |   4 +-
 .../master/event/TaskSchedulerEventFactory.java |  67 ---
 .../querymaster/QueryMasterManagerService.java  |   4 +-
 .../tajo/master/querymaster/QueryUnit.java      |  97 ++--
 .../master/querymaster/QueryUnitAttempt.java    |  13 +-
 .../tajo/master/querymaster/Repartitioner.java  | 199 +++----
 .../tajo/master/querymaster/SubQuery.java       | 122 +++--
 .../main/java/org/apache/tajo/worker/Task.java  |  12 +-
 .../src/main/resources/tajo-default.xml         |  20 +-
 .../org/apache/tajo/storage/MergeScanner.java   |   2 +
 28 files changed, 1994 insertions(+), 505 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0df9f7..8cfc53e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -110,6 +110,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)
+
     TAJO-468: Implements task's detail info page in WEB UI.
     (hyoungjunkim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index b7171c3..4d7254a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -196,6 +196,7 @@ public class TajoConf extends Configuration {
     // Task Configuration
     TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
     TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f),
+    TASK_DEFAULT_SIZE("tajo.task.size-mb", 64),
     //////////////////////////////////
 
     // Metrics

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 3c3c3dd..d4006e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -24,7 +24,6 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
-import org.apache.tajo.storage.fragment.Fragment;
 
 import java.net.URI;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
index 3f4998a..acb1dcc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -37,4 +37,5 @@ public abstract class AbstractTaskScheduler extends AbstractService
   }
 
   public abstract void handleTaskRequestEvent(TaskRequestEvent event);
+  public abstract int remainingScheduledObjectNum();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..e4b98d4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.util.NetUtils;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * DefaultFragmentScheduleAlgorithm selects a fragment randomly for the given argument.
+ * For example, when getHostLocalFragment(host, disk) is called, this algorithm randomly selects a fragment among
+ * the fragments which are stored at the disk of the host specified by the arguments.
+ */
+public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
+  private final static Log LOG = LogFactory.getLog(DefaultFragmentScheduleAlgorithm.class);
+  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
+      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
+  private Map<String, Set<FragmentPair>> rackFragmentMapping =
+      new HashMap<String, Set<FragmentPair>>();
+  private int fragmentNum = 0;
+  private Random random = new Random(System.currentTimeMillis());
+
+  public static class FragmentsPerDisk {
+    private Integer diskId;
+    private Set<FragmentPair> fragmentPairSet;
+
+    public FragmentsPerDisk(Integer diskId) {
+      this.diskId = diskId;
+      this.fragmentPairSet = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
+    }
+
+    public Integer getDiskId() {
+      return diskId;
+    }
+
+    public Set<FragmentPair> getFragmentPairSet() {
+      return fragmentPairSet;
+    }
+
+    public void addFragmentPair(FragmentPair fragmentPair) {
+      fragmentPairSet.add(fragmentPair);
+    }
+
+    public boolean removeFragmentPair(FragmentPair fragmentPair) {
+      return fragmentPairSet.remove(fragmentPair);
+    }
+
+    public int size() {
+      return fragmentPairSet.size();
+    }
+
+    public Iterator<FragmentPair> getFragmentPairIterator() {
+      return fragmentPairSet.iterator();
+    }
+
+    public boolean isEmpty() {
+      return fragmentPairSet.isEmpty();
+    }
+  }
+
+  @Override
+  public void addFragment(FragmentPair fragmentPair) {
+    String[] hosts = fragmentPair.getLeftFragment().getHosts();
+    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      addFragment(hosts[i], diskIds[i], fragmentPair);
+    }
+    fragmentNum++;
+  }
+
+  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
+    // update the fragment maps per host
+    String normalizeHost = NetUtils.normalizeHost(host);
+    Map<Integer, FragmentsPerDisk> diskFragmentMap;
+    if (fragmentHostMapping.containsKey(normalizeHost)) {
+      diskFragmentMap = fragmentHostMapping.get(normalizeHost);
+    } else {
+      diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>();
+      fragmentHostMapping.put(normalizeHost, diskFragmentMap);
+    }
+    FragmentsPerDisk fragmentsPerDisk;
+    if (diskFragmentMap.containsKey(diskId)) {
+      fragmentsPerDisk = diskFragmentMap.get(diskId);
+    } else {
+      fragmentsPerDisk = new FragmentsPerDisk(diskId);
+      diskFragmentMap.put(diskId, fragmentsPerDisk);
+    }
+    fragmentsPerDisk.addFragmentPair(fragmentPair);
+
+    // update the fragment maps per rack
+    String rack = RackResolver.resolve(normalizeHost).getNetworkLocation();
+    Set<FragmentPair> fragmentPairList;
+    if (rackFragmentMapping.containsKey(rack)) {
+      fragmentPairList = rackFragmentMapping.get(rack);
+    } else {
+      fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
+      rackFragmentMapping.put(rack, fragmentPairList);
+    }
+    fragmentPairList.add(fragmentPair);
+  }
+
+  @Override
+  public void removeFragment(FragmentPair fragmentPair) {
+    boolean removed = false;
+    for (String eachHost : fragmentPair.getLeftFragment().getHosts()) {
+      String normalizedHost = NetUtils.normalizeHost(eachHost);
+      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
+      for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) {
+        FragmentsPerDisk fragmentsPerDisk = entry.getValue();
+        removed = fragmentsPerDisk.removeFragmentPair(fragmentPair);
+        if (removed) {
+          if (fragmentsPerDisk.size() == 0) {
+            diskFragmentMap.remove(entry.getKey());
+          }
+          if (diskFragmentMap.size() == 0) {
+            fragmentHostMapping.remove(normalizedHost);
+          }
+          break;
+        }
+      }
+      String rack = RackResolver.resolve(normalizedHost).getNetworkLocation();
+      if (rackFragmentMapping.containsKey(rack)) {
+        Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
+        fragmentPairs.remove(fragmentPair);
+        if (fragmentPairs.size() == 0) {
+          rackFragmentMapping.remove(rack);
+        }
+      }
+    }
+    if (removed) {
+      fragmentNum--;
+    }
+  }
+
+  /**
+   * Randomly select a fragment among the fragments stored on the host.
+   * @param host
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host) {
+    String normalizedHost = NetUtils.normalizeHost(host);
+    if (fragmentHostMapping.containsKey(normalizedHost)) {
+      Collection<FragmentsPerDisk> disks = fragmentHostMapping.get(normalizedHost).values();
+      Iterator<FragmentsPerDisk> diskIterator = disks.iterator();
+      int randomIndex = random.nextInt(disks.size());
+      FragmentsPerDisk fragmentsPerDisk = null;
+      for (int i = 0; i < randomIndex; i++) {
+        fragmentsPerDisk = diskIterator.next();
+      }
+
+      if (fragmentsPerDisk != null) {
+        Iterator<FragmentPair> fragmentIterator = fragmentsPerDisk.getFragmentPairIterator();
+        if (fragmentIterator.hasNext()) {
+          return fragmentIterator.next();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Randomly select a fragment among the fragments stored at the disk of the host.
+   * @param host
+   * @param diskId
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
+    String normalizedHost = NetUtils.normalizeHost(host);
+    if (fragmentHostMapping.containsKey(normalizedHost)) {
+      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+      if (fragmentsPerDiskMap.containsKey(diskId)) {
+        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+        if (!fragmentsPerDisk.isEmpty()) {
+          return fragmentsPerDisk.getFragmentPairIterator().next();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Randomly select a fragment among the fragments stored on nodes of the same rack with the host.
+   * @param host
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getRackLocalFragment(String host) {
+    String rack = RackResolver.resolve(host).getNetworkLocation();
+    if (rackFragmentMapping.containsKey(rack)) {
+      Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
+      if (!fragmentPairs.isEmpty()) {
+        return fragmentPairs.iterator().next();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Randomly select a fragment among the total fragments.
+   * @return a randomly selected fragment
+   */
+  @Override
+  public FragmentPair getRandomFragment() {
+    if (!fragmentHostMapping.isEmpty()) {
+      return fragmentHostMapping.values().iterator().next().values().iterator().next().getFragmentPairIterator().next();
+    }
+    return null;
+  }
+
+  @Override
+  public FragmentPair[] getAllFragments() {
+    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+    for (Map<Integer, FragmentsPerDisk> eachDiskFragmentMap : fragmentHostMapping.values()) {
+      for (FragmentsPerDisk fragmentsPerDisk : eachDiskFragmentMap.values()) {
+        fragmentPairs.addAll(fragmentsPerDisk.fragmentPairSet);
+      }
+    }
+    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
+  }
+
+  @Override
+  public int size() {
+    return fragmentNum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index b1deb43..860a466 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -34,48 +34,44 @@ import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.DefaultTaskSchedulerEvent;
-import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
-import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.URI;
 import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
 public class DefaultTaskScheduler extends AbstractTaskScheduler {
-  private static final Log LOG = LogFactory.getLog(DefaultTaskSchedulerEvent.class);
+  private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
 
-  private final QueryMasterTask.QueryMasterTaskContext context;
-  private TajoAsyncDispatcher dispatcher;
+  private final TaskSchedulerContext context;
+  private SubQuery subQuery;
 
-  private Thread eventHandlingThread;
   private Thread schedulingThread;
   private volatile boolean stopEventHandling;
 
-  BlockingQueue<TaskSchedulerEvent> eventQueue
-      = new LinkedBlockingQueue<TaskSchedulerEvent>();
-
   private ScheduledRequests scheduledRequests;
   private TaskRequests taskRequests;
 
   private int hostLocalAssigned = 0;
   private int rackLocalAssigned = 0;
   private int totalAssigned = 0;
+  private int nextTaskId = 0;
 
-  public DefaultTaskScheduler(QueryMasterTask.QueryMasterTaskContext context) {
+  public DefaultTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
     super(DefaultTaskScheduler.class.getName());
     this.context = context;
-    this.dispatcher = context.getDispatcher();
+    this.subQuery = subQuery;
   }
 
   @Override
@@ -90,24 +86,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   @Override
   public void start() {
     LOG.info("Start TaskScheduler");
-    this.eventHandlingThread = new Thread() {
-      public void run() {
-
-        TaskSchedulerEvent event;
-        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-            handleEvent(event);
-          } catch (InterruptedException e) {
-            //LOG.error("Returning, iterrupted : " + e);
-            break;
-          }
-        }
-        LOG.info("TaskScheduler eventHandlingThread stopped");
-      }
-    };
-
-    this.eventHandlingThread.start();
 
     this.schedulingThread = new Thread() {
       public void run() {
@@ -148,7 +126,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   @Override
   public void stop() {
     stopEventHandling = true;
-    eventHandlingThread.interrupt();
     schedulingThread.interrupt();
 
     // Return all of request callbacks instantly.
@@ -160,16 +137,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     super.stop();
   }
 
-  private void handleEvent(TaskSchedulerEvent event) {
-    if (event.getType() == EventType.T_SCHEDULE) {
-      DefaultTaskSchedulerEvent castEvent = (DefaultTaskSchedulerEvent) event;
-      if (castEvent.isLeafQuery()) {
-        scheduledRequests.addLeafTask(castEvent);
-      } else {
-        scheduledRequests.addNonLeafTask(castEvent);
-      }
-    }
-  }
+  private FileFragment[] fragmentsForNonLeafTask;
 
   List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
   public void schedule() {
@@ -204,20 +172,43 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
   @Override
   public void handle(TaskSchedulerEvent event) {
-    int qSize = eventQueue.size();
-    if (qSize != 0 && qSize % 1000 == 0) {
-      LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
-    }
-    int remCapacity = eventQueue.remainingCapacity();
-    if (remCapacity < 1000) {
-      LOG.warn("Very low remaining capacity in the event-queue "
-          + "of YarnRMContainerAllocator: " + remCapacity);
-    }
-
-    try {
-      eventQueue.put(event);
-    } catch (InterruptedException e) {
-      throw new InternalError(e.getMessage());
+    if (event.getType() == EventType.T_SCHEDULE) {
+      if (event instanceof FragmentScheduleEvent) {
+        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+        if (context.isLeafQuery()) {
+          QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+          QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+          task.setFragment2(castEvent.getLeftFragment());
+          if (castEvent.getRightFragment() != null) {
+            task.setFragment2(castEvent.getRightFragment());
+          }
+          subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+        } else {
+          fragmentsForNonLeafTask = new FileFragment[2];
+          fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
+          fragmentsForNonLeafTask[1] = castEvent.getRightFragment();
+        }
+      } else if (event instanceof FetchScheduleEvent) {
+        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+        Map<String, List<URI>> fetches = castEvent.getFetches();
+        QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+        QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+        for (Entry<String, List<URI>> eachFetch : fetches.entrySet()) {
+          task.addFetches(eachFetch.getKey(), eachFetch.getValue());
+          task.setFragment2(fragmentsForNonLeafTask[0]);
+          if (fragmentsForNonLeafTask[1] != null) {
+            task.setFragment2(fragmentsForNonLeafTask[1]);
+          }
+        }
+        subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+      } else if (event instanceof QueryUnitAttemptScheduleEvent) {
+        QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+        if (context.isLeafQuery()) {
+          scheduledRequests.addLeafTask(castEvent);
+        } else {
+          scheduledRequests.addNonLeafTask(castEvent);
+        }
+      }
     }
   }
 
@@ -226,6 +217,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     taskRequests.handle(event);
   }
 
+  @Override
+  public int remainingScheduledObjectNum() {
+    return subQuery.getQueryUnits().length - totalAssigned;
+  }
+
   private class TaskRequests implements EventHandler<TaskRequestEvent> {
     private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
         new LinkedBlockingQueue<TaskRequestEvent>();
@@ -347,8 +343,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
         new HashMap<String, LinkedList<QueryUnitAttemptId>>();
 
-    public void addLeafTask(DefaultTaskSchedulerEvent event) {
-      List<DataLocation> locations = event.getDataLocations();
+    private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
+      QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
+      DataLocation[] locations = queryUnitAttempt.getQueryUnit().getDataLocations();
 
       for (DataLocation location : locations) {
         String host = location.getHost();
@@ -358,29 +355,29 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           taskBlockLocation = new TaskBlockLocation(host);
           leafTaskHostMapping.put(host, taskBlockLocation);
         }
-        taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), event.getAttemptId());
+        taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), queryUnitAttempt.getId());
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Added attempt req to host " + host);
         }
-      }
-      for (String rack : event.getRacks()) {
+
+        String rack = RackResolver.resolve(host).getNetworkLocation();
         LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
         if (list == null) {
           list = new LinkedList<QueryUnitAttemptId>();
           leafTasksRackMapping.put(rack, list);
         }
-        list.add(event.getAttemptId());
+        list.add(queryUnitAttempt.getId());
         if (LOG.isDebugEnabled()) {
           LOG.debug("Added attempt req to rack " + rack);
         }
       }
 
-      leafTasks.add(event.getAttemptId());
+      leafTasks.add(queryUnitAttempt.getId());
     }
 
-    public void addNonLeafTask(DefaultTaskSchedulerEvent event) {
-      nonLeafTasks.add(event.getAttemptId());
+    private void addNonLeafTask(QueryUnitAttemptScheduleEvent event) {
+      nonLeafTasks.add(event.getQueryUnitAttempt().getId());
     }
 
     public int leafTaskNum() {
@@ -402,7 +399,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         taskRequest = it.next();
         LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
             "containerId=" + taskRequest.getContainerId());
-        ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
+        ContainerProxy container = context.getMasterContext().getResourceAllocator()
+            .getContainer(taskRequest.getContainerId());
         if(container == null) {
           continue;
         }
@@ -460,8 +458,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           }
         }
 
-        SubQuery subQuery = context.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
-
         if (attemptId != null) {
           QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
@@ -470,13 +466,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               "",
               false,
               task.getLogicalPlan().toJson(),
-              context.getQueryContext(),
+              context.getMasterContext().getQueryContext(),
               subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
           if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
 
-          context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
               taskRequest.getContainerId(),
               host, container.getTaskPort()));
           assignedRequest.add(attemptId);
@@ -523,7 +519,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           LOG.debug("Assigned based on * match");
 
           QueryUnit task;
-          SubQuery subQuery = context.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
           task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
@@ -531,7 +526,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               "",
               false,
               task.getLogicalPlan().toJson(),
-              context.getQueryContext(),
+              context.getMasterContext().getQueryContext(),
               subQuery.getDataChannel(),
               subQuery.getBlock().getEnforcer());
           if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
@@ -546,11 +541,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             }
           }
 
-          ContainerProxy container = context.getResourceAllocator().getContainer(
+          ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
               taskRequest.getContainerId());
-          context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
               taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
           taskRequest.getCallback().run(taskAssign.getProto());
+          totalAssigned++;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
new file mode 100644
index 0000000..561f980
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public class FetchScheduleEvent extends TaskSchedulerEvent {
+  private final Map<String, List<URI>> fetches;
+
+  public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
+                            final Map<String, List<URI>> fetches) {
+    super(eventType, blockId);
+    this.fetches = fetches;
+  }
+
+  public Map<String, List<URI>> getFetches() {
+    return fetches;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java
new file mode 100644
index 0000000..598b1c5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+/**
+ * FragmentPair consists of two fragments, a left fragment and a right fragment.
+ * According to queries, it can have the different values.
+ * For join queries, it is assumed to have both fragments.
+ * Also, the left fragment is assumed to be a fragment of the larger table.
+ * For other queries, it is assumed to have only a left fragment.
+ */
+public class FragmentPair {
+  private FileFragment leftFragment;
+  private FileFragment rightFragment;
+
+  public FragmentPair(FileFragment left) {
+    this.leftFragment = left;
+  }
+
+  public FragmentPair(FileFragment left, FileFragment right) {
+    this.leftFragment = left;
+    this.rightFragment = right;
+  }
+
+  public FileFragment getLeftFragment() {
+    return leftFragment;
+  }
+
+  public FileFragment getRightFragment() {
+    return rightFragment;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof FragmentPair) {
+      FragmentPair other = (FragmentPair) o;
+      boolean eq = this.leftFragment.equals(other.leftFragment);
+      if (this.rightFragment != null && other.rightFragment != null) {
+        eq &= this.rightFragment.equals(other.rightFragment);
+      } else if (this.rightFragment == null && other.rightFragment == null) {
+        eq &= true;
+      } else {
+        return false;
+      }
+      return eq;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(leftFragment, rightFragment);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..10d993d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+/**
+ * FragmentScheduleAlgorithm is used by LazyTaskScheduler.
+ * FragmentScheduleAlgorithm selects a fragment for the given argument.
+ *
+ * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm.
+ */
+public interface FragmentScheduleAlgorithm {
+  void addFragment(FragmentPair fragmentPair);
+  void removeFragment(FragmentPair fragmentPair);
+
+  FragmentPair getHostLocalFragment(String host);
+  FragmentPair getHostLocalFragment(String host, Integer diskId);
+  FragmentPair getRackLocalFragment(String host);
+  FragmentPair getRandomFragment();
+  FragmentPair[] getAllFragments();
+
+  int size();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
new file mode 100644
index 0000000..820a0fb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class FragmentScheduleAlgorithmFactory {
+
+  private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS;
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+  private static final Class<?>[] DEFAULT_PARAMS = {};
+
+  public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf)
+      throws IOException {
+    if (CACHED_ALGORITHM_CLASS != null) {
+      return CACHED_ALGORITHM_CLASS;
+    } else {
+      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null,
+          FragmentScheduleAlgorithm.class);
+    }
+
+    if (CACHED_ALGORITHM_CLASS == null) {
+      throw new IOException("Scheduler algorithm is null");
+    }
+    return CACHED_ALGORITHM_CLASS;
+  }
+
+  public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) {
+    T result;
+    try {
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+        constructor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+      result = constructor.newInstance(new Object[]{});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException {
+    return get(getScheduleAlgorithmClass(conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..39448bd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * GreedyFragmentScheduleAlgorithm selects a fragment considering the number of fragments that are not scheduled yet.
+ * Disks of hosts have the priorities which are represented by the remaining number of fragments.
+ * This algorithm selects a fragment with trying minimizing the maximum priority.
+ */
+public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
+  private final static Log LOG = LogFactory.getLog(GreedyFragmentScheduleAlgorithm.class);
+  private final HostPriorityComparator hostComparator = new HostPriorityComparator();
+  private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
+      new HashMap<String, Map<Integer, FragmentsPerDisk>>();
+  private Map<HostAndDisk, PrioritizedHost> totalHostPriority = new HashMap<HostAndDisk, PrioritizedHost>();
+  private Map<String, Set<PrioritizedHost>> hostPriorityPerRack = new HashMap<String, Set<PrioritizedHost>>();
+  private TopologyCache topologyCache = new TopologyCache();
+  private int totalFragmentNum = 0;
+
+  private FragmentsPerDisk getHostFragmentSet(String host, Integer diskId) {
+    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap;
+    FragmentsPerDisk fragmentsPerDisk;
+    if (fragmentHostMapping.containsKey(host)) {
+      fragmentsPerDiskMap = fragmentHostMapping.get(host);
+    } else {
+      fragmentsPerDiskMap = new HashMap<Integer, FragmentsPerDisk>();
+      fragmentHostMapping.put(host, fragmentsPerDiskMap);
+    }
+    if (fragmentsPerDiskMap.containsKey(diskId)) {
+      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+    } else {
+      fragmentsPerDisk = new FragmentsPerDisk(diskId);
+      fragmentsPerDiskMap.put(diskId, fragmentsPerDisk);
+    }
+    return fragmentsPerDisk;
+  }
+
+  private void updateHostPriority(HostAndDisk hostAndDisk, int priority) {
+    if (priority > 0) {
+      // update the priority among the total hosts
+      PrioritizedHost prioritizedHost;
+      if (totalHostPriority.containsKey(hostAndDisk)) {
+        prioritizedHost = totalHostPriority.get(hostAndDisk);
+        prioritizedHost.priority = priority;
+      } else {
+        prioritizedHost = new PrioritizedHost(hostAndDisk, priority);
+        totalHostPriority.put(hostAndDisk, prioritizedHost);
+      }
+
+      // update the priority among the hosts in a rack
+      String rack = topologyCache.resolve(hostAndDisk.host);
+      Set<PrioritizedHost> hostsOfRack;
+      if (!hostPriorityPerRack.containsKey(rack)) {
+        hostsOfRack = new HashSet<PrioritizedHost>();
+        hostsOfRack.add(prioritizedHost);
+        hostPriorityPerRack.put(rack, hostsOfRack);
+      }
+    } else {
+      if (totalHostPriority.containsKey(hostAndDisk)) {
+        PrioritizedHost prioritizedHost = totalHostPriority.remove(hostAndDisk);
+
+        String rack = topologyCache.resolve(hostAndDisk.host);
+        if (hostPriorityPerRack.containsKey(rack)) {
+          Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
+          hostsOfRack.remove(prioritizedHost);
+          if (hostsOfRack.size() == 0){
+            hostPriorityPerRack.remove(rack);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void addFragment(FragmentPair fragmentPair) {
+    String[] hosts = fragmentPair.getLeftFragment().getHosts();
+    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      addFragment(hosts[i], diskIds[i], fragmentPair);
+    }
+    totalFragmentNum++;
+  }
+
+  private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
+    host = topologyCache.normalize(host);
+    FragmentsPerDisk fragmentsPerDisk = getHostFragmentSet(host, diskId);
+    fragmentsPerDisk.addFragmentPair(fragmentPair);
+
+    int priority;
+    HostAndDisk hostAndDisk = new HostAndDisk(host, diskId);
+    if (totalHostPriority.containsKey(hostAndDisk)) {
+      priority = totalHostPriority.get(hostAndDisk).priority;
+    } else {
+      priority = 0;
+    }
+    updateHostPriority(hostAndDisk, priority+1);
+  }
+
+  public int size() {
+    return totalFragmentNum;
+  }
+
+  /**
+   * Selects a fragment that is stored in the given host, and replicated at the disk of the maximum
+   * priority.
+   * @param host
+   * @return If there are fragments stored in the host, returns a fragment. Otherwise, return null.
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host) {
+    String normalizedHost = topologyCache.normalize(host);
+    if (!fragmentHostMapping.containsKey(normalizedHost)) {
+      return null;
+    }
+
+    Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+    List<Integer> disks = Lists.newArrayList(fragmentsPerDiskMap.keySet());
+    Collections.shuffle(disks);
+    FragmentsPerDisk fragmentsPerDisk = null;
+    FragmentPair fragmentPair = null;
+
+    for (Integer diskId : disks) {
+      fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+      if (fragmentsPerDisk != null && !fragmentsPerDisk.isEmpty()) {
+        fragmentPair = getBestFragment(fragmentsPerDisk);
+      }
+      if (fragmentPair != null) {
+        return fragmentPair;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Selects a fragment that is stored at the given disk of the given host, and replicated at the disk of the maximum
+   * priority.
+   * @param host
+   * @param diskId
+   * @return If there are fragments stored at the disk of the host, returns a fragment. Otherwise, return null.
+   */
+  @Override
+  public FragmentPair getHostLocalFragment(String host, Integer diskId) {
+    String normalizedHost = NetUtils.normalizeHost(host);
+    if (fragmentHostMapping.containsKey(normalizedHost)) {
+      Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+      if (fragmentsPerDiskMap.containsKey(diskId)) {
+        FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+        if (!fragmentsPerDisk.isEmpty()) {
+          return getBestFragment(fragmentsPerDisk);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * In the descending order of priority, find a fragment that is shared by the given fragment set and the fragment set
+   * of the maximal priority.
+   * @param fragmentsPerDisk a fragment set
+   * @return a fragment that is shared by the given fragment set and the fragment set of the maximal priority
+   */
+  private FragmentPair getBestFragment(FragmentsPerDisk fragmentsPerDisk) {
+    // Select a fragment that is shared by host and another hostAndDisk that has the most fragments
+    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
+    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
+    Arrays.sort(sortedHosts, hostComparator);
+
+    for (PrioritizedHost nextHost : sortedHosts) {
+      if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
+        Map<Integer, FragmentsPerDisk> diskFragmentsMap = fragmentHostMapping.get(nextHost.hostAndDisk.host);
+        if (diskFragmentsMap.containsKey(nextHost.hostAndDisk.diskId)) {
+          Set<FragmentPair> largeFragmentPairSet = diskFragmentsMap.get(nextHost.hostAndDisk.diskId).getFragmentPairSet();
+          Iterator<FragmentPair> smallFragmentSetIterator = fragmentsPerDisk.getFragmentPairIterator();
+          while (smallFragmentSetIterator.hasNext()) {
+            FragmentPair eachFragmentOfSmallSet = smallFragmentSetIterator.next();
+            if (largeFragmentPairSet.contains(eachFragmentOfSmallSet)) {
+              return eachFragmentOfSmallSet;
+            }
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Selects a fragment that is stored at the same rack of the given host, and replicated at the disk of the maximum
+   * priority.
+   * @param host
+   * @return If there are fragments stored at the same rack of the given host, returns a fragment. Otherwise, return null.
+   */
+  public FragmentPair getRackLocalFragment(String host) {
+    host = topologyCache.normalize(host);
+    // Select a fragment from a host that has the most fragments in the rack
+    String rack = topologyCache.resolve(host);
+    Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
+    if (hostsOfRack != null && hostsOfRack.size() > 0) {
+      PrioritizedHost[] sortedHosts = hostsOfRack.toArray(new PrioritizedHost[hostsOfRack.size()]);
+      Arrays.sort(sortedHosts, hostComparator);
+      for (PrioritizedHost nextHost : sortedHosts) {
+        if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
+          List<FragmentsPerDisk> disks = Lists.newArrayList(fragmentHostMapping.get(nextHost.hostAndDisk.host).values());
+          Collections.shuffle(disks);
+
+          for (FragmentsPerDisk fragmentsPerDisk : disks) {
+            if (!fragmentsPerDisk.isEmpty()) {
+              return fragmentsPerDisk.getFragmentPairIterator().next();
+            }
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Selects a fragment from the disk of the maximum priority.
+   * @return If there are remaining fragments, it returns a fragment. Otherwise, it returns null.
+   */
+  public FragmentPair getRandomFragment() {
+    // Select a fragment from a host that has the most fragments
+    Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
+    PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
+    Arrays.sort(sortedHosts, hostComparator);
+    PrioritizedHost randomHost = sortedHosts[0];
+    if (fragmentHostMapping.containsKey(randomHost.hostAndDisk.host)) {
+      Iterator<FragmentsPerDisk> fragmentsPerDiskIterator = fragmentHostMapping.get(randomHost.hostAndDisk.host).values().iterator();
+      if (fragmentsPerDiskIterator.hasNext()) {
+        Iterator<FragmentPair> fragmentPairIterator = fragmentsPerDiskIterator.next().getFragmentPairIterator();
+        if (fragmentPairIterator.hasNext()) {
+          return fragmentPairIterator.next();
+        }
+      }
+    }
+    return null;
+  }
+
+  public FragmentPair[] getAllFragments() {
+    List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+    for (Map<Integer, FragmentsPerDisk> eachValue : fragmentHostMapping.values()) {
+      for (FragmentsPerDisk fragmentsPerDisk : eachValue.values()) {
+        Set<FragmentPair> pairSet = fragmentsPerDisk.getFragmentPairSet();
+        fragmentPairs.addAll(pairSet);
+      }
+    }
+    return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
+  }
+
+  public void removeFragment(FragmentPair fragmentPair) {
+    String [] hosts = fragmentPair.getLeftFragment().getHosts();
+    int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+    for (int i = 0; i < hosts.length; i++) {
+      String normalizedHost = NetUtils.normalizeHost(hosts[i]);
+      Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
+
+      if (diskFragmentMap != null) {
+        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]);
+        if (fragmentsPerDisk != null) {
+          boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
+          if (isRemoved) {
+            if (fragmentsPerDisk.size() == 0) {
+              diskFragmentMap.remove(diskIds[i]);
+              if (diskFragmentMap.size() == 0) {
+                fragmentHostMapping.remove(normalizedHost);
+              }
+            }
+            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]);
+            if (totalHostPriority.containsKey(hostAndDisk)) {
+              PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
+              updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);
+            }
+          }
+        }
+      }
+    }
+
+    totalFragmentNum--;
+  }
+
+  private static class HostAndDisk {
+    private String host;
+    private Integer diskId;
+
+    public HostAndDisk(String host, Integer diskId) {
+      this.host = host;
+      this.diskId = diskId;
+    }
+
+    public String getHost() {
+      return host;
+    }
+
+    public int getDiskId() {
+      return diskId;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(host, diskId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof HostAndDisk) {
+        HostAndDisk other = (HostAndDisk) o;
+        return this.host.equals(other.host) &&
+            TUtil.checkEquals(this.diskId, other.diskId);
+      }
+      return false;
+    }
+  }
+
+  public static class PrioritizedHost {
+    private HostAndDisk hostAndDisk;
+    private int priority;
+
+    public PrioritizedHost(HostAndDisk hostAndDisk, int priority) {
+      this.hostAndDisk = hostAndDisk;
+      this.priority = priority;
+    }
+
+    public PrioritizedHost(String host, Integer diskId, int priority) {
+      this.hostAndDisk = new HostAndDisk(host, diskId);
+      this.priority = priority;
+    }
+
+    public String getHost() {
+      return hostAndDisk.host;
+    }
+
+    public Integer getDiskId() {
+      return hostAndDisk.diskId;
+    }
+
+    public Integer getPriority() {
+      return priority;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof PrioritizedHost) {
+        PrioritizedHost other = (PrioritizedHost) o;
+        return this.hostAndDisk.equals(other.hostAndDisk);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return hostAndDisk.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "host: " + hostAndDisk.host + " disk: " + hostAndDisk.diskId + " priority: " + priority;
+    }
+  }
+
+
+  public static class HostPriorityComparator implements Comparator<PrioritizedHost> {
+
+    @Override
+    public int compare(PrioritizedHost prioritizedHost, PrioritizedHost prioritizedHost2) {
+      return prioritizedHost2.priority - prioritizedHost.priority;
+    }
+  }
+
+
+  public static class TopologyCache {
+    private Map<String, String> hostRackMap = new HashMap<String, String>();
+    private Map<String, String> normalizedHostMap = new HashMap<String, String>();
+
+    public String normalize(String host) {
+      if (normalizedHostMap.containsKey(host)) {
+        return normalizedHostMap.get(host);
+      } else {
+        String normalized = NetUtils.normalizeHost(host);
+        normalizedHostMap.put(host, normalized);
+        return normalized;
+      }
+    }
+
+    public String resolve(String host) {
+      if (hostRackMap.containsKey(host)) {
+        return hostRackMap.get(host);
+      } else {
+        String rack = RackResolver.resolve(host).getNetworkLocation();
+        hostRackMap.put(host, rack);
+        return rack;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
new file mode 100644
index 0000000..08d080d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class LazyTaskScheduler extends AbstractTaskScheduler {
+  private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class);
+
+  private final TaskSchedulerContext context;
+  private final SubQuery subQuery;
+
+  private Thread schedulingThread;
+  private volatile boolean stopEventHandling;
+
+  BlockingQueue<TaskSchedulerEvent> eventQueue
+      = new LinkedBlockingQueue<TaskSchedulerEvent>();
+
+  private TaskRequests taskRequests;
+  private FragmentScheduleAlgorithm scheduledFragments;
+  private ScheduledFetches scheduledFetches;
+
+  private int diskLocalAssigned = 0;
+  private int hostLocalAssigned = 0;
+  private int rackLocalAssigned = 0;
+  private int totalAssigned = 0;
+
+  private int nextTaskId = 0;
+  private int containerNum;
+
+  public LazyTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
+    super(LazyTaskScheduler.class.getName());
+    this.context = context;
+    this.subQuery = subQuery;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    taskRequests  = new TaskRequests();
+    try {
+      scheduledFragments = FragmentScheduleAlgorithmFactory.get(conf);
+      LOG.info(scheduledFragments.getClass().getSimpleName() + " is selected for the scheduling algorithm.");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    if (!context.isLeafQuery()) {
+      scheduledFetches = new ScheduledFetches();
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    containerNum = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
+        subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+        context.getEstimatedTaskNum(), 512);
+
+    LOG.info("Start TaskScheduler");
+    this.schedulingThread = new Thread() {
+      public void run() {
+
+        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            break;
+          }
+
+          schedule();
+        }
+        LOG.info("TaskScheduler schedulingThread stopped");
+      }
+    };
+
+    this.schedulingThread.start();
+    super.start();
+  }
+
+  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+  static {
+    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+
+    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+    builder.setId(NULL_ATTEMPT_ID.getProto());
+    builder.setShouldDie(true);
+    builder.setOutputTable("");
+    builder.setSerializedData("");
+    builder.setClusteredOutput(false);
+    stopTaskRunnerReq = builder.build();
+  }
+
+  @Override
+  public void stop() {
+    stopEventHandling = true;
+    schedulingThread.interrupt();
+
+    // Return all of request callbacks instantly.
+    for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+      req.getCallback().run(stopTaskRunnerReq);
+    }
+
+    LOG.info("Task Scheduler stopped");
+    super.stop();
+  }
+
+  List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
+  public void schedule() {
+    if (taskRequests.size() > 0) {
+      if (context.isLeafQuery()) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", Fragment Schedule Request: " +
+            scheduledFragments.size());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledFragments.size());
+        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+        if (taskRequestEvents.size() > 0) {
+          assignLeafTasks(taskRequestEvents);
+        }
+        taskRequestEvents.clear();
+      } else {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", Fetch Schedule Request: " +
+            scheduledFetches.size());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledFetches.size());
+        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+        if (taskRequestEvents.size() > 0) {
+          assignNonLeafTasks(taskRequestEvents);
+        }
+        taskRequestEvents.clear();
+      }
+    }
+  }
+
+  @Override
+  public void handle(TaskSchedulerEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of YarnRMContainerAllocator: " + remCapacity);
+    }
+
+    if (event.getType() == EventType.T_SCHEDULE) {
+      if (event instanceof FragmentScheduleEvent) {
+        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+        scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), castEvent.getRightFragment()));
+        initDiskBalancer(castEvent.getLeftFragment().getHosts(), castEvent.getLeftFragment().getDiskIds());
+      } else if (event instanceof FetchScheduleEvent) {
+        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+        scheduledFetches.addFetch(castEvent.getFetches());
+      } else if (event instanceof QueryUnitAttemptScheduleEvent) {
+        QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+        assignTask(castEvent.getContext(), castEvent.getQueryUnitAttempt());
+      }
+    }
+  }
+
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    taskRequests.handle(event);
+  }
+
+  @Override
+  public int remainingScheduledObjectNum() {
+    if (context.isLeafQuery()) {
+      return scheduledFragments.size();
+    } else {
+      return scheduledFetches.size();
+    }
+  }
+
+  private Map<String, DiskBalancer> hostDiskBalancerMap = new HashMap<String, DiskBalancer>();
+
+  private void initDiskBalancer(String[] hosts, int[] diskIds) {
+    for (int i = 0; i < hosts.length; i++) {
+      DiskBalancer diskBalancer;
+      String normalized = NetUtils.normalizeHost(hosts[i]);
+      if (hostDiskBalancerMap.containsKey(normalized)) {
+        diskBalancer = hostDiskBalancerMap.get(normalized);
+      } else {
+        diskBalancer = new DiskBalancer(normalized);
+        hostDiskBalancerMap.put(normalized, diskBalancer);
+      }
+      diskBalancer.addDiskId(diskIds[i]);
+    }
+  }
+
+  private static class DiskBalancer {
+    private HashMap<ContainerId, Integer> containerDiskMap = new HashMap<ContainerId, Integer>();
+    private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>();
+    private String host;
+
+    public DiskBalancer(String host){
+      this.host = host;
+    }
+
+    public void addDiskId(Integer diskId) {
+      if (!diskReferMap.containsKey(diskId)) {
+        diskReferMap.put(diskId, 0);
+      }
+    }
+
+    public Integer getDiskId(ContainerId containerId) {
+      if (!containerDiskMap.containsKey(containerId)) {
+        assignVolumeId(containerId);
+      }
+
+      return containerDiskMap.get(containerId);
+    }
+
+    public void assignVolumeId(ContainerId containerId){
+      Map.Entry<Integer, Integer> volumeEntry = null;
+
+      for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) {
+        if(volumeEntry == null) volumeEntry = entry;
+
+        if (volumeEntry.getValue() >= entry.getValue()) {
+          volumeEntry = entry;
+        }
+      }
+
+      if(volumeEntry != null){
+        diskReferMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
+        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
+            + diskReferMap.get(volumeEntry.getKey()));
+        containerDiskMap.put(containerId, volumeEntry.getKey());
+      }
+    }
+
+    public String getHost() {
+      return host;
+    }
+  }
+
+  private class TaskRequests implements EventHandler<TaskRequestEvent> {
+    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+        new LinkedBlockingQueue<TaskRequestEvent>();
+
+    @Override
+    public void handle(TaskRequestEvent event) {
+      LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+      if(stopEventHandling) {
+        event.getCallback().run(stopTaskRunnerReq);
+        return;
+      }
+      int qSize = taskRequestQueue.size();
+      if (qSize != 0 && qSize % 1000 == 0) {
+        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+      }
+      int remCapacity = taskRequestQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue "
+            + "of YarnRMContainerAllocator: " + remCapacity);
+      }
+
+      taskRequestQueue.add(event);
+    }
+
+    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+                                int num) {
+      taskRequestQueue.drainTo(taskRequests, num);
+    }
+
+    public int size() {
+      return taskRequestQueue.size();
+    }
+  }
+
+  private long adjustTaskSize() {
+    long originTaskSize = context.getMasterContext().getConf().getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024;
+    long fragNumPerTask = context.getTaskSize() / originTaskSize;
+    if (fragNumPerTask * containerNum > remainingScheduledObjectNum()) {
+      return context.getTaskSize();
+    } else {
+      fragNumPerTask = (long) Math.ceil((double)remainingScheduledObjectNum() / (double)containerNum);
+      return originTaskSize * fragNumPerTask;
+    }
+  }
+
+  private void assignLeafTasks(List<TaskRequestEvent> taskRequests) {
+    Collections.shuffle(taskRequests);
+    Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+    TaskRequestEvent taskRequest;
+    while (it.hasNext() && scheduledFragments.size() > 0) {
+      taskRequest = it.next();
+      LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+          "containerId=" + taskRequest.getContainerId());
+      ContainerProxy container = context.getMasterContext().getResourceAllocator().
+          getContainer(taskRequest.getContainerId());
+      String host = container.getTaskHostName();
+      if(container == null) {
+        continue;
+      }
+      QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+          host, taskRequest.getCallback());
+      QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+
+      FragmentPair fragmentPair;
+      List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+      boolean diskLocal = false;
+      long assignedFragmentSize = 0;
+      long taskSize = adjustTaskSize();
+      LOG.info("Adjusted task size: " + taskSize);
+
+      // host local, disk local
+      String normalized = NetUtils.normalizeHost(host);
+      Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
+      if (diskId != null && diskId != -1) {
+        do {
+          fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId);
+          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
+            break;
+          }
+
+          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+            break;
+          } else {
+            fragmentPairs.add(fragmentPair);
+            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+            if (fragmentPair.getRightFragment() != null) {
+              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+            }
+          }
+          scheduledFragments.removeFragment(fragmentPair);
+          diskLocal = true;
+        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
+      }
+
+      if (assignedFragmentSize < taskSize) {
+        // host local
+        do {
+          fragmentPair = scheduledFragments.getHostLocalFragment(host);
+          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
+            break;
+          }
+
+          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+            break;
+          } else {
+            fragmentPairs.add(fragmentPair);
+            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+            if (fragmentPair.getRightFragment() != null) {
+              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+            }
+          }
+          scheduledFragments.removeFragment(fragmentPair);
+          diskLocal = false;
+        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
+      }
+
+      // rack local
+      if (fragmentPairs.size() == 0) {
+        fragmentPair = scheduledFragments.getRackLocalFragment(host);
+
+        // random
+        if (fragmentPair == null) {
+          fragmentPair = scheduledFragments.getRandomFragment();
+        } else {
+          rackLocalAssigned++;
+        }
+
+        if (fragmentPair != null) {
+          fragmentPairs.add(fragmentPair);
+          scheduledFragments.removeFragment(fragmentPair);
+        }
+      } else {
+        if (diskLocal) {
+          diskLocalAssigned++;
+        } else {
+          hostLocalAssigned++;
+        }
+      }
+
+      if (fragmentPairs.size() == 0) {
+        throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+      }
+
+      LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());
+
+      task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
+      subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+    }
+  }
+
+  private void assignNonLeafTasks(List<TaskRequestEvent> taskRequests) {
+    Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+    TaskRequestEvent taskRequest;
+    while (it.hasNext()) {
+      taskRequest = it.next();
+      LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+      // random allocation
+      if (scheduledFetches.size() > 0) {
+        LOG.debug("Assigned based on * match");
+        ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
+            taskRequest.getContainerId());
+        QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+            container.getTaskHostName(), taskRequest.getCallback());
+        QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+        task.setFragment(scheduledFragments.getAllFragments());
+        subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+      }
+    }
+  }
+
+  private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) {
+    QueryUnitAttemptId attemptId = taskAttempt.getId();
+    ContainerProxy containerProxy = context.getMasterContext().getResourceAllocator().
+        getContainer(attemptContext.getContainerId());
+    QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+        attemptId,
+        new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()),
+        "",
+        false,
+        taskAttempt.getQueryUnit().getLogicalPlan().toJson(),
+        context.getMasterContext().getQueryContext(),
+        subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
+    if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+      taskAssign.setInterQuery();
+    }
+
+    if (!context.isLeafQuery()) {
+      Map<String, List<URI>> fetch = scheduledFetches.getNextFetch();
+      scheduledFetches.popNextFetch();
+
+      for (Entry<String, List<URI>> fetchEntry : fetch.entrySet()) {
+        for (URI eachValue : fetchEntry.getValue()) {
+          taskAssign.addFetch(fetchEntry.getKey(), eachValue);
+        }
+      }
+    }
+
+    context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+        attemptContext.getContainerId(), attemptContext.getHost(), containerProxy.getTaskPort()));
+
+    totalAssigned++;
+    attemptContext.getCallback().run(taskAssign.getProto());
+
+    if (context.isLeafQuery()) {
+      LOG.debug("DiskLocalAssigned / Total: " + diskLocalAssigned + " / " + totalAssigned);
+      LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
+      LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
+    }
+  }
+
+  private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+    if (masterPlan.isRoot(block)) {
+      return false;
+    }
+
+    ExecutionBlock parent = masterPlan.getParent(block);
+    if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+      return false;
+    }
+
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java
new file mode 100644
index 0000000..8823bc8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ScheduledFetches {
+  private List<Map<String, List<URI>>> fetches = new ArrayList<Map<String, List<URI>>>();
+
+  public void addFetch(Map<String, List<URI>> fetch) {
+    this.fetches.add(fetch);
+  }
+
+  public boolean hasNextFetch() {
+    return fetches.size() > 0;
+  }
+
+  public Map<String, List<URI>> getNextFetch() {
+    return hasNextFetch() ? fetches.get(0) : null;
+  }
+
+  public Map<String, List<URI>> popNextFetch() {
+    return hasNextFetch() ? fetches.remove(0) : null;
+  }
+
+  public int size() {
+    return fetches.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
new file mode 100644
index 0000000..3335d11
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TaskSchedulerContext {
+  private QueryMasterTask.QueryMasterTaskContext masterContext;
+  private boolean isLeafQuery;
+  private ExecutionBlockId blockId;
+  private int taskSize;
+  private int estimatedTaskNum;
+
+  public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery,
+                              ExecutionBlockId blockId) {
+    this.masterContext = masterContext;
+    this.isLeafQuery = isLeafQuery;
+    this.blockId = blockId;
+  }
+
+  public QueryMasterTask.QueryMasterTaskContext getMasterContext() {
+    return masterContext;
+  }
+
+  public boolean isLeafQuery() {
+    return isLeafQuery;
+  }
+
+  public ExecutionBlockId getBlockId() {
+    return blockId;
+  }
+
+  public int getTaskSize() {
+    return taskSize;
+  }
+
+  public int getEstimatedTaskNum() {
+    return estimatedTaskNum;
+  }
+
+  public void setTaskSize(int taskSize) {
+    this.taskSize = taskSize;
+  }
+
+  public void setEstimatedTaskNum(int estimatedTaskNum) {
+    this.estimatedTaskNum = estimatedTaskNum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
index 72de4ec..520ecd3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
@@ -20,55 +20,50 @@ package org.apache.tajo.master;
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.master.querymaster.QueryMasterTask.QueryMasterTaskContext;
+import org.apache.tajo.master.querymaster.SubQuery;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.Map;
 
 public class TaskSchedulerFactory {
-
-  private static final Map<String, Class<? extends AbstractTaskScheduler>> CACHED_SCHEDULER_CLASSES = Maps.newConcurrentMap();
-
+  private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
   private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+  private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, SubQuery.class };
 
-  private static final Class<?>[] DEFAULT_SCHEDULER_PARAMS = { QueryMasterTaskContext.class };
+  public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
+      throws IOException {
+    if (CACHED_ALGORITHM_CLASS != null) {
+      return CACHED_ALGORITHM_CLASS;
+    } else {
+      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class);
+    }
 
-  public static <T extends AbstractTaskScheduler> T getTaskSCheduler(Configuration conf, QueryMasterTaskContext context) {
-    T result;
+    if (CACHED_ALGORITHM_CLASS == null) {
+      throw new IOException("Task scheduler is null");
+    }
+    return CACHED_ALGORITHM_CLASS;
+  }
 
+  public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
+                                                        SubQuery subQuery) {
+    T result;
     try {
-      Class<T> schedulerClass = (Class<T>) getTaskSchedulerClass(conf);
-      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(schedulerClass);
+      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
       if (constructor == null) {
-        constructor = schedulerClass.getDeclaredConstructor(DEFAULT_SCHEDULER_PARAMS);
+        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
         constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(schedulerClass, constructor);
+        CONSTRUCTOR_CACHE.put(clazz, constructor);
       }
-      result = constructor.newInstance(new Object[]{context});
+      result = constructor.newInstance(new Object[]{context, subQuery});
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-
     return result;
   }
 
-  public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf) throws IOException {
-    String handlerName = getSchedulerType(conf);
-    Class<? extends AbstractTaskScheduler> schedulerClass = CACHED_SCHEDULER_CLASSES.get(handlerName);
-    if (schedulerClass == null) {
-      schedulerClass = conf.getClass(String.format("tajo.querymaster.scheduler-handler.%s.class", handlerName), null, AbstractTaskScheduler.class);
-      CACHED_SCHEDULER_CLASSES.put(handlerName, schedulerClass);
-    }
-
-    if (schedulerClass == null) {
-      throw new IOException("Unknown Scheduler Type: " + handlerName);
-    }
-
-    return schedulerClass;
-  }
-
-  public static String getSchedulerType(Configuration conf) {
-    return conf.get("tajo.querymaster.scheduler-handler.type", "default");
+  public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, SubQuery subQuery)
+      throws IOException {
+    return get(getTaskSchedulerClass(conf), context, subQuery);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
deleted file mode 100644
index 00bce5b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.master.querymaster.QueryUnitAttempt;
-import org.apache.tajo.storage.DataLocation;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class DefaultTaskSchedulerEvent extends TaskSchedulerEvent {
-  private final QueryUnitAttemptId attemptId;
-  private final boolean isLeafQuery;
-  private final List<DataLocation> dataLocations;
-  private final String[] racks;
-
-  public DefaultTaskSchedulerEvent(final EventType eventType,
-                                   final QueryUnitAttempt attempt) {
-    super(eventType, attempt.getId().getQueryUnitId().getExecutionBlockId());
-    this.attemptId = attempt.getId();
-    this.isLeafQuery = attempt.isLeafTask();
-    if (this.isLeafQuery) {
-      this.dataLocations = attempt.getQueryUnit().getDataLocations();
-      Set<String> racks = new HashSet<String>();
-      for (DataLocation location : attempt.getQueryUnit().getDataLocations()) {
-        racks.add(RackResolver.resolve(location.getHost()).getNetworkLocation());
-      }
-      this.racks = racks.toArray(new String[racks.size()]);
-    } else {
-      this.dataLocations = null;
-      this.racks = null;
-    }
-  }
-
-  public DefaultTaskSchedulerEvent(final QueryUnitAttemptId attemptId,
-                                   final EventType eventType, boolean isLeafQuery,
-                                   final List<DataLocation> dataLocations,
-                                   final String[] racks) {
-    super(eventType, attemptId.getQueryUnitId().getExecutionBlockId());
-    this.attemptId = attemptId;
-    this.isLeafQuery = isLeafQuery;
-    this.dataLocations = dataLocations;
-    this.racks = racks;
-  }
-
-  public QueryUnitAttemptId getAttemptId() {
-    return this.attemptId;
-  }
-
-  public boolean isLeafQuery() {
-    return this.isLeafQuery;
-  }
-
-  public List<DataLocation> getDataLocations() {
-    return this.dataLocations;
-  }
-
-  public String[] getRacks() {
-    return this.racks;
-  }
-
-  @Override
-  public String toString() {
-    return "DefaultTaskSchedulerEvent{" +
-        "attemptId=" + attemptId +
-        ", isLeafQuery=" + isLeafQuery +
-        ", hosts=" + (dataLocations == null ? null : dataLocations) +
-        ", racks=" + (racks == null ? null : Arrays.asList(racks)) +
-        '}';
-  }
-}