You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/01/02 05:36:57 UTC
[2/2] git commit: TAJO-385: Refactoring TaskScheduler to assign
multiple fragments. (jihoon)
TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/df5727c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/df5727c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/df5727c4
Branch: refs/heads/master
Commit: df5727c49edd2e726b99dfa49dcc6b3d5b6f252b
Parents: 35b8617
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Jan 2 13:34:36 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Jan 2 13:36:09 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../tajo/engine/query/QueryUnitRequestImpl.java | 1 -
.../tajo/master/AbstractTaskScheduler.java | 1 +
.../DefaultFragmentScheduleAlgorithm.java | 247 +++++++++
.../tajo/master/DefaultTaskScheduler.java | 148 +++---
.../apache/tajo/master/FetchScheduleEvent.java | 40 ++
.../org/apache/tajo/master/FragmentPair.java | 73 +++
.../tajo/master/FragmentScheduleAlgorithm.java | 38 ++
.../FragmentScheduleAlgorithmFactory.java | 68 +++
.../master/GreedyFragmentScheduleAlgorithm.java | 421 +++++++++++++++
.../apache/tajo/master/LazyTaskScheduler.java | 512 +++++++++++++++++++
.../apache/tajo/master/ScheduledFetches.java | 47 ++
.../tajo/master/TaskSchedulerContext.java | 68 +++
.../tajo/master/TaskSchedulerFactory.java | 55 +-
.../master/event/DefaultTaskSchedulerEvent.java | 91 ----
.../master/event/FragmentScheduleEvent.java | 59 +++
.../event/QueryUnitAttemptScheduleEvent.java | 87 ++++
.../tajo/master/event/TaskSchedulerEvent.java | 4 +-
.../master/event/TaskSchedulerEventFactory.java | 67 ---
.../querymaster/QueryMasterManagerService.java | 4 +-
.../tajo/master/querymaster/QueryUnit.java | 97 ++--
.../master/querymaster/QueryUnitAttempt.java | 13 +-
.../tajo/master/querymaster/Repartitioner.java | 199 +++----
.../tajo/master/querymaster/SubQuery.java | 122 +++--
.../main/java/org/apache/tajo/worker/Task.java | 12 +-
.../src/main/resources/tajo-default.xml | 20 +-
.../org/apache/tajo/storage/MergeScanner.java | 2 +
28 files changed, 1994 insertions(+), 505 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0df9f7..8cfc53e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -110,6 +110,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)
+
TAJO-468: Implements task's detail info page in WEB UI.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index b7171c3..4d7254a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -196,6 +196,7 @@ public class TajoConf extends Configuration {
// Task Configuration
TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f),
+ TASK_DEFAULT_SIZE("tajo.task.size-mb", 64),
//////////////////////////////////
// Metrics
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 3c3c3dd..d4006e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -24,7 +24,6 @@ import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
-import org.apache.tajo.storage.fragment.Fragment;
import java.net.URI;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
index 3f4998a..acb1dcc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -37,4 +37,5 @@ public abstract class AbstractTaskScheduler extends AbstractService
}
public abstract void handleTaskRequestEvent(TaskRequestEvent event);
+ public abstract int remainingScheduledObjectNum();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..e4b98d4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.util.NetUtils;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * DefaultFragmentScheduleAlgorithm selects a fragment randomly for the given argument.
+ * For example, when getHostLocalFragment(host, disk) is called, this algorithm randomly selects a fragment among
+ * the fragments which are stored at the disk of the host specified by the arguments.
+ */
+public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
+ private final static Log LOG = LogFactory.getLog(DefaultFragmentScheduleAlgorithm.class);
+ private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
+ new HashMap<String, Map<Integer, FragmentsPerDisk>>();
+ private Map<String, Set<FragmentPair>> rackFragmentMapping =
+ new HashMap<String, Set<FragmentPair>>();
+ private int fragmentNum = 0;
+ private Random random = new Random(System.currentTimeMillis());
+
+ public static class FragmentsPerDisk {
+ private Integer diskId;
+ private Set<FragmentPair> fragmentPairSet;
+
+ public FragmentsPerDisk(Integer diskId) {
+ this.diskId = diskId;
+ this.fragmentPairSet = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
+ }
+
+ public Integer getDiskId() {
+ return diskId;
+ }
+
+ public Set<FragmentPair> getFragmentPairSet() {
+ return fragmentPairSet;
+ }
+
+ public void addFragmentPair(FragmentPair fragmentPair) {
+ fragmentPairSet.add(fragmentPair);
+ }
+
+ public boolean removeFragmentPair(FragmentPair fragmentPair) {
+ return fragmentPairSet.remove(fragmentPair);
+ }
+
+ public int size() {
+ return fragmentPairSet.size();
+ }
+
+ public Iterator<FragmentPair> getFragmentPairIterator() {
+ return fragmentPairSet.iterator();
+ }
+
+ public boolean isEmpty() {
+ return fragmentPairSet.isEmpty();
+ }
+ }
+
+ @Override
+ public void addFragment(FragmentPair fragmentPair) {
+ String[] hosts = fragmentPair.getLeftFragment().getHosts();
+ int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ addFragment(hosts[i], diskIds[i], fragmentPair);
+ }
+ fragmentNum++;
+ }
+
+ private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
+ // update the fragment maps per host
+ String normalizeHost = NetUtils.normalizeHost(host);
+ Map<Integer, FragmentsPerDisk> diskFragmentMap;
+ if (fragmentHostMapping.containsKey(normalizeHost)) {
+ diskFragmentMap = fragmentHostMapping.get(normalizeHost);
+ } else {
+ diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>();
+ fragmentHostMapping.put(normalizeHost, diskFragmentMap);
+ }
+ FragmentsPerDisk fragmentsPerDisk;
+ if (diskFragmentMap.containsKey(diskId)) {
+ fragmentsPerDisk = diskFragmentMap.get(diskId);
+ } else {
+ fragmentsPerDisk = new FragmentsPerDisk(diskId);
+ diskFragmentMap.put(diskId, fragmentsPerDisk);
+ }
+ fragmentsPerDisk.addFragmentPair(fragmentPair);
+
+ // update the fragment maps per rack
+ String rack = RackResolver.resolve(normalizeHost).getNetworkLocation();
+ Set<FragmentPair> fragmentPairList;
+ if (rackFragmentMapping.containsKey(rack)) {
+ fragmentPairList = rackFragmentMapping.get(rack);
+ } else {
+ fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>());
+ rackFragmentMapping.put(rack, fragmentPairList);
+ }
+ fragmentPairList.add(fragmentPair);
+ }
+
+ @Override
+ public void removeFragment(FragmentPair fragmentPair) {
+ boolean removed = false;
+ for (String eachHost : fragmentPair.getLeftFragment().getHosts()) {
+ String normalizedHost = NetUtils.normalizeHost(eachHost);
+ Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
+ for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) {
+ FragmentsPerDisk fragmentsPerDisk = entry.getValue();
+ removed = fragmentsPerDisk.removeFragmentPair(fragmentPair);
+ if (removed) {
+ if (fragmentsPerDisk.size() == 0) {
+ diskFragmentMap.remove(entry.getKey());
+ }
+ if (diskFragmentMap.size() == 0) {
+ fragmentHostMapping.remove(normalizedHost);
+ }
+ break;
+ }
+ }
+ String rack = RackResolver.resolve(normalizedHost).getNetworkLocation();
+ if (rackFragmentMapping.containsKey(rack)) {
+ Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
+ fragmentPairs.remove(fragmentPair);
+ if (fragmentPairs.size() == 0) {
+ rackFragmentMapping.remove(rack);
+ }
+ }
+ }
+ if (removed) {
+ fragmentNum--;
+ }
+ }
+
+ /**
+ * Randomly select a fragment among the fragments stored on the host.
+ * @param host
+ * @return a randomly selected fragment
+ */
+ @Override
+ public FragmentPair getHostLocalFragment(String host) {
+ String normalizedHost = NetUtils.normalizeHost(host);
+ if (fragmentHostMapping.containsKey(normalizedHost)) {
+ Collection<FragmentsPerDisk> disks = fragmentHostMapping.get(normalizedHost).values();
+ Iterator<FragmentsPerDisk> diskIterator = disks.iterator();
+ int randomIndex = random.nextInt(disks.size());
+ FragmentsPerDisk fragmentsPerDisk = null;
+ for (int i = 0; i < randomIndex; i++) {
+ fragmentsPerDisk = diskIterator.next();
+ }
+
+ if (fragmentsPerDisk != null) {
+ Iterator<FragmentPair> fragmentIterator = fragmentsPerDisk.getFragmentPairIterator();
+ if (fragmentIterator.hasNext()) {
+ return fragmentIterator.next();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Randomly select a fragment among the fragments stored at the disk of the host.
+ * @param host
+ * @param diskId
+ * @return a randomly selected fragment
+ */
+ @Override
+ public FragmentPair getHostLocalFragment(String host, Integer diskId) {
+ String normalizedHost = NetUtils.normalizeHost(host);
+ if (fragmentHostMapping.containsKey(normalizedHost)) {
+ Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+ if (fragmentsPerDiskMap.containsKey(diskId)) {
+ FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+ if (!fragmentsPerDisk.isEmpty()) {
+ return fragmentsPerDisk.getFragmentPairIterator().next();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Randomly select a fragment among the fragments stored on nodes of the same rack with the host.
+ * @param host
+ * @return a randomly selected fragment
+ */
+ @Override
+ public FragmentPair getRackLocalFragment(String host) {
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ if (rackFragmentMapping.containsKey(rack)) {
+ Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack);
+ if (!fragmentPairs.isEmpty()) {
+ return fragmentPairs.iterator().next();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Randomly select a fragment among the total fragments.
+ * @return a randomly selected fragment
+ */
+ @Override
+ public FragmentPair getRandomFragment() {
+ if (!fragmentHostMapping.isEmpty()) {
+ return fragmentHostMapping.values().iterator().next().values().iterator().next().getFragmentPairIterator().next();
+ }
+ return null;
+ }
+
+ @Override
+ public FragmentPair[] getAllFragments() {
+ List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+ for (Map<Integer, FragmentsPerDisk> eachDiskFragmentMap : fragmentHostMapping.values()) {
+ for (FragmentsPerDisk fragmentsPerDisk : eachDiskFragmentMap.values()) {
+ fragmentPairs.addAll(fragmentsPerDisk.fragmentPairSet);
+ }
+ }
+ return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
+ }
+
+ @Override
+ public int size() {
+ return fragmentNum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index b1deb43..860a466 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -34,48 +34,44 @@ import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.event.DefaultTaskSchedulerEvent;
-import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
-import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
import java.net.URI;
import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
public class DefaultTaskScheduler extends AbstractTaskScheduler {
- private static final Log LOG = LogFactory.getLog(DefaultTaskSchedulerEvent.class);
+ private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
- private final QueryMasterTask.QueryMasterTaskContext context;
- private TajoAsyncDispatcher dispatcher;
+ private final TaskSchedulerContext context;
+ private SubQuery subQuery;
- private Thread eventHandlingThread;
private Thread schedulingThread;
private volatile boolean stopEventHandling;
- BlockingQueue<TaskSchedulerEvent> eventQueue
- = new LinkedBlockingQueue<TaskSchedulerEvent>();
-
private ScheduledRequests scheduledRequests;
private TaskRequests taskRequests;
private int hostLocalAssigned = 0;
private int rackLocalAssigned = 0;
private int totalAssigned = 0;
+ private int nextTaskId = 0;
- public DefaultTaskScheduler(QueryMasterTask.QueryMasterTaskContext context) {
+ public DefaultTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
super(DefaultTaskScheduler.class.getName());
this.context = context;
- this.dispatcher = context.getDispatcher();
+ this.subQuery = subQuery;
}
@Override
@@ -90,24 +86,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void start() {
LOG.info("Start TaskScheduler");
- this.eventHandlingThread = new Thread() {
- public void run() {
-
- TaskSchedulerEvent event;
- while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
- try {
- event = eventQueue.take();
- handleEvent(event);
- } catch (InterruptedException e) {
- //LOG.error("Returning, iterrupted : " + e);
- break;
- }
- }
- LOG.info("TaskScheduler eventHandlingThread stopped");
- }
- };
-
- this.eventHandlingThread.start();
this.schedulingThread = new Thread() {
public void run() {
@@ -148,7 +126,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void stop() {
stopEventHandling = true;
- eventHandlingThread.interrupt();
schedulingThread.interrupt();
// Return all of request callbacks instantly.
@@ -160,16 +137,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
super.stop();
}
- private void handleEvent(TaskSchedulerEvent event) {
- if (event.getType() == EventType.T_SCHEDULE) {
- DefaultTaskSchedulerEvent castEvent = (DefaultTaskSchedulerEvent) event;
- if (castEvent.isLeafQuery()) {
- scheduledRequests.addLeafTask(castEvent);
- } else {
- scheduledRequests.addNonLeafTask(castEvent);
- }
- }
- }
+ private FileFragment[] fragmentsForNonLeafTask;
List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
public void schedule() {
@@ -204,20 +172,43 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void handle(TaskSchedulerEvent event) {
- int qSize = eventQueue.size();
- if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
- }
- int remCapacity = eventQueue.remainingCapacity();
- if (remCapacity < 1000) {
- LOG.warn("Very low remaining capacity in the event-queue "
- + "of YarnRMContainerAllocator: " + remCapacity);
- }
-
- try {
- eventQueue.put(event);
- } catch (InterruptedException e) {
- throw new InternalError(e.getMessage());
+ if (event.getType() == EventType.T_SCHEDULE) {
+ if (event instanceof FragmentScheduleEvent) {
+ FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+ if (context.isLeafQuery()) {
+ QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+ QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+ task.setFragment2(castEvent.getLeftFragment());
+ if (castEvent.getRightFragment() != null) {
+ task.setFragment2(castEvent.getRightFragment());
+ }
+ subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ } else {
+ fragmentsForNonLeafTask = new FileFragment[2];
+ fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
+ fragmentsForNonLeafTask[1] = castEvent.getRightFragment();
+ }
+ } else if (event instanceof FetchScheduleEvent) {
+ FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+ Map<String, List<URI>> fetches = castEvent.getFetches();
+ QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+ QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+ for (Entry<String, List<URI>> eachFetch : fetches.entrySet()) {
+ task.addFetches(eachFetch.getKey(), eachFetch.getValue());
+ task.setFragment2(fragmentsForNonLeafTask[0]);
+ if (fragmentsForNonLeafTask[1] != null) {
+ task.setFragment2(fragmentsForNonLeafTask[1]);
+ }
+ }
+ subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ } else if (event instanceof QueryUnitAttemptScheduleEvent) {
+ QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+ if (context.isLeafQuery()) {
+ scheduledRequests.addLeafTask(castEvent);
+ } else {
+ scheduledRequests.addNonLeafTask(castEvent);
+ }
+ }
}
}
@@ -226,6 +217,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
taskRequests.handle(event);
}
+ @Override
+ public int remainingScheduledObjectNum() {
+ return subQuery.getQueryUnits().length - totalAssigned;
+ }
+
private class TaskRequests implements EventHandler<TaskRequestEvent> {
private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
new LinkedBlockingQueue<TaskRequestEvent>();
@@ -347,8 +343,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
new HashMap<String, LinkedList<QueryUnitAttemptId>>();
- public void addLeafTask(DefaultTaskSchedulerEvent event) {
- List<DataLocation> locations = event.getDataLocations();
+ private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
+ QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
+ DataLocation[] locations = queryUnitAttempt.getQueryUnit().getDataLocations();
for (DataLocation location : locations) {
String host = location.getHost();
@@ -358,29 +355,29 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
taskBlockLocation = new TaskBlockLocation(host);
leafTaskHostMapping.put(host, taskBlockLocation);
}
- taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), event.getAttemptId());
+ taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), queryUnitAttempt.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to host " + host);
}
- }
- for (String rack : event.getRacks()) {
+
+ String rack = RackResolver.resolve(host).getNetworkLocation();
LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
if (list == null) {
list = new LinkedList<QueryUnitAttemptId>();
leafTasksRackMapping.put(rack, list);
}
- list.add(event.getAttemptId());
+ list.add(queryUnitAttempt.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to rack " + rack);
}
}
- leafTasks.add(event.getAttemptId());
+ leafTasks.add(queryUnitAttempt.getId());
}
- public void addNonLeafTask(DefaultTaskSchedulerEvent event) {
- nonLeafTasks.add(event.getAttemptId());
+ private void addNonLeafTask(QueryUnitAttemptScheduleEvent event) {
+ nonLeafTasks.add(event.getQueryUnitAttempt().getId());
}
public int leafTaskNum() {
@@ -402,7 +399,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
taskRequest = it.next();
LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
"containerId=" + taskRequest.getContainerId());
- ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
+ ContainerProxy container = context.getMasterContext().getResourceAllocator()
+ .getContainer(taskRequest.getContainerId());
if(container == null) {
continue;
}
@@ -460,8 +458,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
}
- SubQuery subQuery = context.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
-
if (attemptId != null) {
QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
@@ -470,13 +466,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
"",
false,
task.getLogicalPlan().toJson(),
- context.getQueryContext(),
+ context.getMasterContext().getQueryContext(),
subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
taskAssign.setInterQuery();
}
- context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+ context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(),
host, container.getTaskPort()));
assignedRequest.add(attemptId);
@@ -523,7 +519,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
LOG.debug("Assigned based on * match");
QueryUnit task;
- SubQuery subQuery = context.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
@@ -531,7 +526,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
"",
false,
task.getLogicalPlan().toJson(),
- context.getQueryContext(),
+ context.getMasterContext().getQueryContext(),
subQuery.getDataChannel(),
subQuery.getBlock().getEnforcer());
if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
@@ -546,11 +541,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
}
- ContainerProxy container = context.getResourceAllocator().getContainer(
+ ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
taskRequest.getContainerId());
- context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+ context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
taskRequest.getCallback().run(taskAssign.getProto());
+ totalAssigned++;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
new file mode 100644
index 0000000..561f980
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public class FetchScheduleEvent extends TaskSchedulerEvent {
+ private final Map<String, List<URI>> fetches;
+
+ public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
+ final Map<String, List<URI>> fetches) {
+ super(eventType, blockId);
+ this.fetches = fetches;
+ }
+
+ public Map<String, List<URI>> getFetches() {
+ return fetches;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java
new file mode 100644
index 0000000..598b1c5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentPair.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+/**
+ * FragmentPair consists of two fragments, a left fragment and a right fragment.
+ * According to queries, it can have the different values.
+ * For join queries, it is assumed to have both fragments.
+ * Also, the left fragment is assumed to be a fragment of the larger table.
+ * For other queries, it is assumed to have only a left fragment.
+ */
+public class FragmentPair {
+ private FileFragment leftFragment;
+ private FileFragment rightFragment;
+
+ public FragmentPair(FileFragment left) {
+ this.leftFragment = left;
+ }
+
+ public FragmentPair(FileFragment left, FileFragment right) {
+ this.leftFragment = left;
+ this.rightFragment = right;
+ }
+
+ public FileFragment getLeftFragment() {
+ return leftFragment;
+ }
+
+ public FileFragment getRightFragment() {
+ return rightFragment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof FragmentPair) {
+ FragmentPair other = (FragmentPair) o;
+ boolean eq = this.leftFragment.equals(other.leftFragment);
+ if (this.rightFragment != null && other.rightFragment != null) {
+ eq &= this.rightFragment.equals(other.rightFragment);
+ } else if (this.rightFragment == null && other.rightFragment == null) {
+ eq &= true;
+ } else {
+ return false;
+ }
+ return eq;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(leftFragment, rightFragment);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..10d993d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+/**
+ * FragmentScheduleAlgorithm is used by LazyTaskScheduler.
+ * FragmentScheduleAlgorithm selects a fragment for the given argument.
+ *
+ * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm.
+ */
+public interface FragmentScheduleAlgorithm {
+ void addFragment(FragmentPair fragmentPair);
+ void removeFragment(FragmentPair fragmentPair);
+
+ FragmentPair getHostLocalFragment(String host);
+ FragmentPair getHostLocalFragment(String host, Integer diskId);
+ FragmentPair getRackLocalFragment(String host);
+ FragmentPair getRandomFragment();
+ FragmentPair[] getAllFragments();
+
+ int size();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
new file mode 100644
index 0000000..820a0fb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class FragmentScheduleAlgorithmFactory {
+
+ private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS;
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+ private static final Class<?>[] DEFAULT_PARAMS = {};
+
+ public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf)
+ throws IOException {
+ if (CACHED_ALGORITHM_CLASS != null) {
+ return CACHED_ALGORITHM_CLASS;
+ } else {
+ CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null,
+ FragmentScheduleAlgorithm.class);
+ }
+
+ if (CACHED_ALGORITHM_CLASS == null) {
+ throw new IOException("Scheduler algorithm is null");
+ }
+ return CACHED_ALGORITHM_CLASS;
+ }
+
+ public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) {
+ T result;
+ try {
+ Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+ if (constructor == null) {
+ constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(clazz, constructor);
+ }
+ result = constructor.newInstance(new Object[]{});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
+ public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException {
+ return get(getScheduleAlgorithmClass(conf));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..39448bd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * GreedyFragmentScheduleAlgorithm selects a fragment considering the number of fragments that are not scheduled yet.
+ * Disks of hosts have the priorities which are represented by the remaining number of fragments.
+ * This algorithm selects a fragment with trying minimizing the maximum priority.
+ */
+public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm {
+ private final static Log LOG = LogFactory.getLog(GreedyFragmentScheduleAlgorithm.class);
+ private final HostPriorityComparator hostComparator = new HostPriorityComparator();
+ private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping =
+ new HashMap<String, Map<Integer, FragmentsPerDisk>>();
+ private Map<HostAndDisk, PrioritizedHost> totalHostPriority = new HashMap<HostAndDisk, PrioritizedHost>();
+ private Map<String, Set<PrioritizedHost>> hostPriorityPerRack = new HashMap<String, Set<PrioritizedHost>>();
+ private TopologyCache topologyCache = new TopologyCache();
+ private int totalFragmentNum = 0;
+
+ private FragmentsPerDisk getHostFragmentSet(String host, Integer diskId) {
+ Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap;
+ FragmentsPerDisk fragmentsPerDisk;
+ if (fragmentHostMapping.containsKey(host)) {
+ fragmentsPerDiskMap = fragmentHostMapping.get(host);
+ } else {
+ fragmentsPerDiskMap = new HashMap<Integer, FragmentsPerDisk>();
+ fragmentHostMapping.put(host, fragmentsPerDiskMap);
+ }
+ if (fragmentsPerDiskMap.containsKey(diskId)) {
+ fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+ } else {
+ fragmentsPerDisk = new FragmentsPerDisk(diskId);
+ fragmentsPerDiskMap.put(diskId, fragmentsPerDisk);
+ }
+ return fragmentsPerDisk;
+ }
+
+ private void updateHostPriority(HostAndDisk hostAndDisk, int priority) {
+ if (priority > 0) {
+ // update the priority among the total hosts
+ PrioritizedHost prioritizedHost;
+ if (totalHostPriority.containsKey(hostAndDisk)) {
+ prioritizedHost = totalHostPriority.get(hostAndDisk);
+ prioritizedHost.priority = priority;
+ } else {
+ prioritizedHost = new PrioritizedHost(hostAndDisk, priority);
+ totalHostPriority.put(hostAndDisk, prioritizedHost);
+ }
+
+ // update the priority among the hosts in a rack
+ String rack = topologyCache.resolve(hostAndDisk.host);
+ Set<PrioritizedHost> hostsOfRack;
+ if (!hostPriorityPerRack.containsKey(rack)) {
+ hostsOfRack = new HashSet<PrioritizedHost>();
+ hostsOfRack.add(prioritizedHost);
+ hostPriorityPerRack.put(rack, hostsOfRack);
+ }
+ } else {
+ if (totalHostPriority.containsKey(hostAndDisk)) {
+ PrioritizedHost prioritizedHost = totalHostPriority.remove(hostAndDisk);
+
+ String rack = topologyCache.resolve(hostAndDisk.host);
+ if (hostPriorityPerRack.containsKey(rack)) {
+ Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
+ hostsOfRack.remove(prioritizedHost);
+ if (hostsOfRack.size() == 0){
+ hostPriorityPerRack.remove(rack);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void addFragment(FragmentPair fragmentPair) {
+ String[] hosts = fragmentPair.getLeftFragment().getHosts();
+ int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ addFragment(hosts[i], diskIds[i], fragmentPair);
+ }
+ totalFragmentNum++;
+ }
+
+ private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) {
+ host = topologyCache.normalize(host);
+ FragmentsPerDisk fragmentsPerDisk = getHostFragmentSet(host, diskId);
+ fragmentsPerDisk.addFragmentPair(fragmentPair);
+
+ int priority;
+ HostAndDisk hostAndDisk = new HostAndDisk(host, diskId);
+ if (totalHostPriority.containsKey(hostAndDisk)) {
+ priority = totalHostPriority.get(hostAndDisk).priority;
+ } else {
+ priority = 0;
+ }
+ updateHostPriority(hostAndDisk, priority+1);
+ }
+
+ public int size() {
+ return totalFragmentNum;
+ }
+
+ /**
+ * Selects a fragment that is stored in the given host, and replicated at the disk of the maximum
+ * priority.
+ * @param host
+ * @return If there are fragments stored in the host, returns a fragment. Otherwise, return null.
+ */
+ @Override
+ public FragmentPair getHostLocalFragment(String host) {
+ String normalizedHost = topologyCache.normalize(host);
+ if (!fragmentHostMapping.containsKey(normalizedHost)) {
+ return null;
+ }
+
+ Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+ List<Integer> disks = Lists.newArrayList(fragmentsPerDiskMap.keySet());
+ Collections.shuffle(disks);
+ FragmentsPerDisk fragmentsPerDisk = null;
+ FragmentPair fragmentPair = null;
+
+ for (Integer diskId : disks) {
+ fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+ if (fragmentsPerDisk != null && !fragmentsPerDisk.isEmpty()) {
+ fragmentPair = getBestFragment(fragmentsPerDisk);
+ }
+ if (fragmentPair != null) {
+ return fragmentPair;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Selects a fragment that is stored at the given disk of the given host, and replicated at the disk of the maximum
+ * priority.
+ * @param host
+ * @param diskId
+ * @return If there are fragments stored at the disk of the host, returns a fragment. Otherwise, return null.
+ */
+ @Override
+ public FragmentPair getHostLocalFragment(String host, Integer diskId) {
+ String normalizedHost = NetUtils.normalizeHost(host);
+ if (fragmentHostMapping.containsKey(normalizedHost)) {
+ Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost);
+ if (fragmentsPerDiskMap.containsKey(diskId)) {
+ FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId);
+ if (!fragmentsPerDisk.isEmpty()) {
+ return getBestFragment(fragmentsPerDisk);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * In the descending order of priority, find a fragment that is shared by the given fragment set and the fragment set
+ * of the maximal priority.
+ * @param fragmentsPerDisk a fragment set
+ * @return a fragment that is shared by the given fragment set and the fragment set of the maximal priority
+ */
+ private FragmentPair getBestFragment(FragmentsPerDisk fragmentsPerDisk) {
+ // Select a fragment that is shared by host and another hostAndDisk that has the most fragments
+ Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
+ PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
+ Arrays.sort(sortedHosts, hostComparator);
+
+ for (PrioritizedHost nextHost : sortedHosts) {
+ if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
+ Map<Integer, FragmentsPerDisk> diskFragmentsMap = fragmentHostMapping.get(nextHost.hostAndDisk.host);
+ if (diskFragmentsMap.containsKey(nextHost.hostAndDisk.diskId)) {
+ Set<FragmentPair> largeFragmentPairSet = diskFragmentsMap.get(nextHost.hostAndDisk.diskId).getFragmentPairSet();
+ Iterator<FragmentPair> smallFragmentSetIterator = fragmentsPerDisk.getFragmentPairIterator();
+ while (smallFragmentSetIterator.hasNext()) {
+ FragmentPair eachFragmentOfSmallSet = smallFragmentSetIterator.next();
+ if (largeFragmentPairSet.contains(eachFragmentOfSmallSet)) {
+ return eachFragmentOfSmallSet;
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Selects a fragment that is stored at the same rack of the given host, and replicated at the disk of the maximum
+ * priority.
+ * @param host
+ * @return If there are fragments stored at the same rack of the given host, returns a fragment. Otherwise, return null.
+ */
+ public FragmentPair getRackLocalFragment(String host) {
+ host = topologyCache.normalize(host);
+ // Select a fragment from a host that has the most fragments in the rack
+ String rack = topologyCache.resolve(host);
+ Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack);
+ if (hostsOfRack != null && hostsOfRack.size() > 0) {
+ PrioritizedHost[] sortedHosts = hostsOfRack.toArray(new PrioritizedHost[hostsOfRack.size()]);
+ Arrays.sort(sortedHosts, hostComparator);
+ for (PrioritizedHost nextHost : sortedHosts) {
+ if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) {
+ List<FragmentsPerDisk> disks = Lists.newArrayList(fragmentHostMapping.get(nextHost.hostAndDisk.host).values());
+ Collections.shuffle(disks);
+
+ for (FragmentsPerDisk fragmentsPerDisk : disks) {
+ if (!fragmentsPerDisk.isEmpty()) {
+ return fragmentsPerDisk.getFragmentPairIterator().next();
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Selects a fragment from the disk of the maximum priority.
+ * @return If there are remaining fragments, it returns a fragment. Otherwise, it returns null.
+ */
+ public FragmentPair getRandomFragment() {
+ // Select a fragment from a host that has the most fragments
+ Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values();
+ PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]);
+ Arrays.sort(sortedHosts, hostComparator);
+ PrioritizedHost randomHost = sortedHosts[0];
+ if (fragmentHostMapping.containsKey(randomHost.hostAndDisk.host)) {
+ Iterator<FragmentsPerDisk> fragmentsPerDiskIterator = fragmentHostMapping.get(randomHost.hostAndDisk.host).values().iterator();
+ if (fragmentsPerDiskIterator.hasNext()) {
+ Iterator<FragmentPair> fragmentPairIterator = fragmentsPerDiskIterator.next().getFragmentPairIterator();
+ if (fragmentPairIterator.hasNext()) {
+ return fragmentPairIterator.next();
+ }
+ }
+ }
+ return null;
+ }
+
+ public FragmentPair[] getAllFragments() {
+ List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+ for (Map<Integer, FragmentsPerDisk> eachValue : fragmentHostMapping.values()) {
+ for (FragmentsPerDisk fragmentsPerDisk : eachValue.values()) {
+ Set<FragmentPair> pairSet = fragmentsPerDisk.getFragmentPairSet();
+ fragmentPairs.addAll(pairSet);
+ }
+ }
+ return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]);
+ }
+
+ public void removeFragment(FragmentPair fragmentPair) {
+ String [] hosts = fragmentPair.getLeftFragment().getHosts();
+ int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ String normalizedHost = NetUtils.normalizeHost(hosts[i]);
+ Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
+
+ if (diskFragmentMap != null) {
+ FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]);
+ if (fragmentsPerDisk != null) {
+ boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
+ if (isRemoved) {
+ if (fragmentsPerDisk.size() == 0) {
+ diskFragmentMap.remove(diskIds[i]);
+ if (diskFragmentMap.size() == 0) {
+ fragmentHostMapping.remove(normalizedHost);
+ }
+ }
+ HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]);
+ if (totalHostPriority.containsKey(hostAndDisk)) {
+ PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
+ updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);
+ }
+ }
+ }
+ }
+ }
+
+ totalFragmentNum--;
+ }
+
+ private static class HostAndDisk {
+ private String host;
+ private Integer diskId;
+
+ public HostAndDisk(String host, Integer diskId) {
+ this.host = host;
+ this.diskId = diskId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getDiskId() {
+ return diskId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(host, diskId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof HostAndDisk) {
+ HostAndDisk other = (HostAndDisk) o;
+ return this.host.equals(other.host) &&
+ TUtil.checkEquals(this.diskId, other.diskId);
+ }
+ return false;
+ }
+ }
+
+ public static class PrioritizedHost {
+ private HostAndDisk hostAndDisk;
+ private int priority;
+
+ public PrioritizedHost(HostAndDisk hostAndDisk, int priority) {
+ this.hostAndDisk = hostAndDisk;
+ this.priority = priority;
+ }
+
+ public PrioritizedHost(String host, Integer diskId, int priority) {
+ this.hostAndDisk = new HostAndDisk(host, diskId);
+ this.priority = priority;
+ }
+
+ public String getHost() {
+ return hostAndDisk.host;
+ }
+
+ public Integer getDiskId() {
+ return hostAndDisk.diskId;
+ }
+
+ public Integer getPriority() {
+ return priority;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof PrioritizedHost) {
+ PrioritizedHost other = (PrioritizedHost) o;
+ return this.hostAndDisk.equals(other.hostAndDisk);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return hostAndDisk.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "host: " + hostAndDisk.host + " disk: " + hostAndDisk.diskId + " priority: " + priority;
+ }
+ }
+
+
+ public static class HostPriorityComparator implements Comparator<PrioritizedHost> {
+
+ @Override
+ public int compare(PrioritizedHost prioritizedHost, PrioritizedHost prioritizedHost2) {
+ return prioritizedHost2.priority - prioritizedHost.priority;
+ }
+ }
+
+
+ public static class TopologyCache {
+ private Map<String, String> hostRackMap = new HashMap<String, String>();
+ private Map<String, String> normalizedHostMap = new HashMap<String, String>();
+
+ public String normalize(String host) {
+ if (normalizedHostMap.containsKey(host)) {
+ return normalizedHostMap.get(host);
+ } else {
+ String normalized = NetUtils.normalizeHost(host);
+ normalizedHostMap.put(host, normalized);
+ return normalized;
+ }
+ }
+
+ public String resolve(String host) {
+ if (hostRackMap.containsKey(host)) {
+ return hostRackMap.get(host);
+ } else {
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ hostRackMap.put(host, rack);
+ return rack;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
new file mode 100644
index 0000000..08d080d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class LazyTaskScheduler extends AbstractTaskScheduler {
+ private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class);
+
+ private final TaskSchedulerContext context;
+ private final SubQuery subQuery;
+
+ private Thread schedulingThread;
+ private volatile boolean stopEventHandling;
+
+ BlockingQueue<TaskSchedulerEvent> eventQueue
+ = new LinkedBlockingQueue<TaskSchedulerEvent>();
+
+ private TaskRequests taskRequests;
+ private FragmentScheduleAlgorithm scheduledFragments;
+ private ScheduledFetches scheduledFetches;
+
+ private int diskLocalAssigned = 0;
+ private int hostLocalAssigned = 0;
+ private int rackLocalAssigned = 0;
+ private int totalAssigned = 0;
+
+ private int nextTaskId = 0;
+ private int containerNum;
+
+ public LazyTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
+ super(LazyTaskScheduler.class.getName());
+ this.context = context;
+ this.subQuery = subQuery;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ taskRequests = new TaskRequests();
+ try {
+ scheduledFragments = FragmentScheduleAlgorithmFactory.get(conf);
+ LOG.info(scheduledFragments.getClass().getSimpleName() + " is selected for the scheduling algorithm.");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (!context.isLeafQuery()) {
+ scheduledFetches = new ScheduledFetches();
+ }
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ containerNum = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
+ subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+ context.getEstimatedTaskNum(), 512);
+
+ LOG.info("Start TaskScheduler");
+ this.schedulingThread = new Thread() {
+ public void run() {
+
+ while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ schedule();
+ }
+ LOG.info("TaskScheduler schedulingThread stopped");
+ }
+ };
+
+ this.schedulingThread.start();
+ super.start();
+ }
+
+ private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+ public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+ static {
+ ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+ NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+
+ TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+ TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+ builder.setId(NULL_ATTEMPT_ID.getProto());
+ builder.setShouldDie(true);
+ builder.setOutputTable("");
+ builder.setSerializedData("");
+ builder.setClusteredOutput(false);
+ stopTaskRunnerReq = builder.build();
+ }
+
+ @Override
+ public void stop() {
+ stopEventHandling = true;
+ schedulingThread.interrupt();
+
+ // Return all of request callbacks instantly.
+ for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+ req.getCallback().run(stopTaskRunnerReq);
+ }
+
+ LOG.info("Task Scheduler stopped");
+ super.stop();
+ }
+
+ List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>();
+ public void schedule() {
+ if (taskRequests.size() > 0) {
+ if (context.isLeafQuery()) {
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+ taskRequests.size() + ", Fragment Schedule Request: " +
+ scheduledFragments.size());
+ taskRequests.getTaskRequests(taskRequestEvents,
+ scheduledFragments.size());
+ LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+ if (taskRequestEvents.size() > 0) {
+ assignLeafTasks(taskRequestEvents);
+ }
+ taskRequestEvents.clear();
+ } else {
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+ taskRequests.size() + ", Fetch Schedule Request: " +
+ scheduledFetches.size());
+ taskRequests.getTaskRequests(taskRequestEvents,
+ scheduledFetches.size());
+ LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+ if (taskRequestEvents.size() > 0) {
+ assignNonLeafTasks(taskRequestEvents);
+ }
+ taskRequestEvents.clear();
+ }
+ }
+ }
+
+ @Override
+ public void handle(TaskSchedulerEvent event) {
+ int qSize = eventQueue.size();
+ if (qSize != 0 && qSize % 1000 == 0) {
+ LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+ }
+ int remCapacity = eventQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue "
+ + "of YarnRMContainerAllocator: " + remCapacity);
+ }
+
+ if (event.getType() == EventType.T_SCHEDULE) {
+ if (event instanceof FragmentScheduleEvent) {
+ FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+ scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), castEvent.getRightFragment()));
+ initDiskBalancer(castEvent.getLeftFragment().getHosts(), castEvent.getLeftFragment().getDiskIds());
+ } else if (event instanceof FetchScheduleEvent) {
+ FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+ scheduledFetches.addFetch(castEvent.getFetches());
+ } else if (event instanceof QueryUnitAttemptScheduleEvent) {
+ QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+ assignTask(castEvent.getContext(), castEvent.getQueryUnitAttempt());
+ }
+ }
+ }
+
+ public void handleTaskRequestEvent(TaskRequestEvent event) {
+ taskRequests.handle(event);
+ }
+
+ @Override
+ public int remainingScheduledObjectNum() {
+ if (context.isLeafQuery()) {
+ return scheduledFragments.size();
+ } else {
+ return scheduledFetches.size();
+ }
+ }
+
+ private Map<String, DiskBalancer> hostDiskBalancerMap = new HashMap<String, DiskBalancer>();
+
+ private void initDiskBalancer(String[] hosts, int[] diskIds) {
+ for (int i = 0; i < hosts.length; i++) {
+ DiskBalancer diskBalancer;
+ String normalized = NetUtils.normalizeHost(hosts[i]);
+ if (hostDiskBalancerMap.containsKey(normalized)) {
+ diskBalancer = hostDiskBalancerMap.get(normalized);
+ } else {
+ diskBalancer = new DiskBalancer(normalized);
+ hostDiskBalancerMap.put(normalized, diskBalancer);
+ }
+ diskBalancer.addDiskId(diskIds[i]);
+ }
+ }
+
+ private static class DiskBalancer {
+ private HashMap<ContainerId, Integer> containerDiskMap = new HashMap<ContainerId, Integer>();
+ private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>();
+ private String host;
+
+ public DiskBalancer(String host){
+ this.host = host;
+ }
+
+ public void addDiskId(Integer diskId) {
+ if (!diskReferMap.containsKey(diskId)) {
+ diskReferMap.put(diskId, 0);
+ }
+ }
+
+ public Integer getDiskId(ContainerId containerId) {
+ if (!containerDiskMap.containsKey(containerId)) {
+ assignVolumeId(containerId);
+ }
+
+ return containerDiskMap.get(containerId);
+ }
+
+ public void assignVolumeId(ContainerId containerId){
+ Map.Entry<Integer, Integer> volumeEntry = null;
+
+ for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) {
+ if(volumeEntry == null) volumeEntry = entry;
+
+ if (volumeEntry.getValue() >= entry.getValue()) {
+ volumeEntry = entry;
+ }
+ }
+
+ if(volumeEntry != null){
+ diskReferMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
+ LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
+ + diskReferMap.get(volumeEntry.getKey()));
+ containerDiskMap.put(containerId, volumeEntry.getKey());
+ }
+ }
+
+ public String getHost() {
+ return host;
+ }
+ }
+
+ private class TaskRequests implements EventHandler<TaskRequestEvent> {
+ private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+ new LinkedBlockingQueue<TaskRequestEvent>();
+
+ @Override
+ public void handle(TaskRequestEvent event) {
+ LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+ if(stopEventHandling) {
+ event.getCallback().run(stopTaskRunnerReq);
+ return;
+ }
+ int qSize = taskRequestQueue.size();
+ if (qSize != 0 && qSize % 1000 == 0) {
+ LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+ }
+ int remCapacity = taskRequestQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue "
+ + "of YarnRMContainerAllocator: " + remCapacity);
+ }
+
+ taskRequestQueue.add(event);
+ }
+
+ public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+ int num) {
+ taskRequestQueue.drainTo(taskRequests, num);
+ }
+
+ public int size() {
+ return taskRequestQueue.size();
+ }
+ }
+
+ private long adjustTaskSize() {
+ long originTaskSize = context.getMasterContext().getConf().getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024;
+ long fragNumPerTask = context.getTaskSize() / originTaskSize;
+ if (fragNumPerTask * containerNum > remainingScheduledObjectNum()) {
+ return context.getTaskSize();
+ } else {
+ fragNumPerTask = (long) Math.ceil((double)remainingScheduledObjectNum() / (double)containerNum);
+ return originTaskSize * fragNumPerTask;
+ }
+ }
+
+ private void assignLeafTasks(List<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
+ Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+ TaskRequestEvent taskRequest;
+ while (it.hasNext() && scheduledFragments.size() > 0) {
+ taskRequest = it.next();
+ LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+ "containerId=" + taskRequest.getContainerId());
+ ContainerProxy container = context.getMasterContext().getResourceAllocator().
+ getContainer(taskRequest.getContainerId());
+ String host = container.getTaskHostName();
+ if(container == null) {
+ continue;
+ }
+ QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+ host, taskRequest.getCallback());
+ QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+
+ FragmentPair fragmentPair;
+ List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
+ boolean diskLocal = false;
+ long assignedFragmentSize = 0;
+ long taskSize = adjustTaskSize();
+ LOG.info("Adjusted task size: " + taskSize);
+
+ // host local, disk local
+ String normalized = NetUtils.normalizeHost(host);
+ Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
+ if (diskId != null && diskId != -1) {
+ do {
+ fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId);
+ if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
+ break;
+ }
+
+ if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+ break;
+ } else {
+ fragmentPairs.add(fragmentPair);
+ assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+ if (fragmentPair.getRightFragment() != null) {
+ assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+ }
+ }
+ scheduledFragments.removeFragment(fragmentPair);
+ diskLocal = true;
+ } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
+ }
+
+ if (assignedFragmentSize < taskSize) {
+ // host local
+ do {
+ fragmentPair = scheduledFragments.getHostLocalFragment(host);
+ if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
+ break;
+ }
+
+ if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+ break;
+ } else {
+ fragmentPairs.add(fragmentPair);
+ assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+ if (fragmentPair.getRightFragment() != null) {
+ assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+ }
+ }
+ scheduledFragments.removeFragment(fragmentPair);
+ diskLocal = false;
+ } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
+ }
+
+ // rack local
+ if (fragmentPairs.size() == 0) {
+ fragmentPair = scheduledFragments.getRackLocalFragment(host);
+
+ // random
+ if (fragmentPair == null) {
+ fragmentPair = scheduledFragments.getRandomFragment();
+ } else {
+ rackLocalAssigned++;
+ }
+
+ if (fragmentPair != null) {
+ fragmentPairs.add(fragmentPair);
+ scheduledFragments.removeFragment(fragmentPair);
+ }
+ } else {
+ if (diskLocal) {
+ diskLocalAssigned++;
+ } else {
+ hostLocalAssigned++;
+ }
+ }
+
+ if (fragmentPairs.size() == 0) {
+ throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+ }
+
+ LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());
+
+ task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
+ subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ }
+ }
+
+ private void assignNonLeafTasks(List<TaskRequestEvent> taskRequests) {
+ Iterator<TaskRequestEvent> it = taskRequests.iterator();
+
+ TaskRequestEvent taskRequest;
+ while (it.hasNext()) {
+ taskRequest = it.next();
+ LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+ // random allocation
+ if (scheduledFetches.size() > 0) {
+ LOG.debug("Assigned based on * match");
+ ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
+ taskRequest.getContainerId());
+ QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+ container.getTaskHostName(), taskRequest.getCallback());
+ QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+ task.setFragment(scheduledFragments.getAllFragments());
+ subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ }
+ }
+ }
+
+ private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) {
+ QueryUnitAttemptId attemptId = taskAttempt.getId();
+ ContainerProxy containerProxy = context.getMasterContext().getResourceAllocator().
+ getContainer(attemptContext.getContainerId());
+ QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+ attemptId,
+ new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()),
+ "",
+ false,
+ taskAttempt.getQueryUnit().getLogicalPlan().toJson(),
+ context.getMasterContext().getQueryContext(),
+ subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
+ if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+ taskAssign.setInterQuery();
+ }
+
+ if (!context.isLeafQuery()) {
+ Map<String, List<URI>> fetch = scheduledFetches.getNextFetch();
+ scheduledFetches.popNextFetch();
+
+ for (Entry<String, List<URI>> fetchEntry : fetch.entrySet()) {
+ for (URI eachValue : fetchEntry.getValue()) {
+ taskAssign.addFetch(fetchEntry.getKey(), eachValue);
+ }
+ }
+ }
+
+ context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+ attemptContext.getContainerId(), attemptContext.getHost(), containerProxy.getTaskPort()));
+
+ totalAssigned++;
+ attemptContext.getCallback().run(taskAssign.getProto());
+
+ if (context.isLeafQuery()) {
+ LOG.debug("DiskLocalAssigned / Total: " + diskLocalAssigned + " / " + totalAssigned);
+ LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned);
+ LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
+ }
+ }
+
+ private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+ if (masterPlan.isRoot(block)) {
+ return false;
+ }
+
+ ExecutionBlock parent = masterPlan.getParent(block);
+ if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+ return false;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java
new file mode 100644
index 0000000..8823bc8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ScheduledFetches.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ScheduledFetches {
+ private List<Map<String, List<URI>>> fetches = new ArrayList<Map<String, List<URI>>>();
+
+ public void addFetch(Map<String, List<URI>> fetch) {
+ this.fetches.add(fetch);
+ }
+
+ public boolean hasNextFetch() {
+ return fetches.size() > 0;
+ }
+
+ public Map<String, List<URI>> getNextFetch() {
+ return hasNextFetch() ? fetches.get(0) : null;
+ }
+
+ public Map<String, List<URI>> popNextFetch() {
+ return hasNextFetch() ? fetches.remove(0) : null;
+ }
+
+ public int size() {
+ return fetches.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
new file mode 100644
index 0000000..3335d11
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TaskSchedulerContext {
+ private QueryMasterTask.QueryMasterTaskContext masterContext;
+ private boolean isLeafQuery;
+ private ExecutionBlockId blockId;
+ private int taskSize;
+ private int estimatedTaskNum;
+
+ public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery,
+ ExecutionBlockId blockId) {
+ this.masterContext = masterContext;
+ this.isLeafQuery = isLeafQuery;
+ this.blockId = blockId;
+ }
+
+ public QueryMasterTask.QueryMasterTaskContext getMasterContext() {
+ return masterContext;
+ }
+
+ public boolean isLeafQuery() {
+ return isLeafQuery;
+ }
+
+ public ExecutionBlockId getBlockId() {
+ return blockId;
+ }
+
+ public int getTaskSize() {
+ return taskSize;
+ }
+
+ public int getEstimatedTaskNum() {
+ return estimatedTaskNum;
+ }
+
+ public void setTaskSize(int taskSize) {
+ this.taskSize = taskSize;
+ }
+
+ public void setEstimatedTaskNum(int estimatedTaskNum) {
+ this.estimatedTaskNum = estimatedTaskNum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
index 72de4ec..520ecd3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
@@ -20,55 +20,50 @@ package org.apache.tajo.master;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.master.querymaster.QueryMasterTask.QueryMasterTaskContext;
+import org.apache.tajo.master.querymaster.SubQuery;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Map;
public class TaskSchedulerFactory {
-
- private static final Map<String, Class<? extends AbstractTaskScheduler>> CACHED_SCHEDULER_CLASSES = Maps.newConcurrentMap();
-
+ private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+ private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, SubQuery.class };
- private static final Class<?>[] DEFAULT_SCHEDULER_PARAMS = { QueryMasterTaskContext.class };
+ public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
+ throws IOException {
+ if (CACHED_ALGORITHM_CLASS != null) {
+ return CACHED_ALGORITHM_CLASS;
+ } else {
+ CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class);
+ }
- public static <T extends AbstractTaskScheduler> T getTaskSCheduler(Configuration conf, QueryMasterTaskContext context) {
- T result;
+ if (CACHED_ALGORITHM_CLASS == null) {
+ throw new IOException("Task scheduler is null");
+ }
+ return CACHED_ALGORITHM_CLASS;
+ }
+ public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
+ SubQuery subQuery) {
+ T result;
try {
- Class<T> schedulerClass = (Class<T>) getTaskSchedulerClass(conf);
- Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(schedulerClass);
+ Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
if (constructor == null) {
- constructor = schedulerClass.getDeclaredConstructor(DEFAULT_SCHEDULER_PARAMS);
+ constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
constructor.setAccessible(true);
- CONSTRUCTOR_CACHE.put(schedulerClass, constructor);
+ CONSTRUCTOR_CACHE.put(clazz, constructor);
}
- result = constructor.newInstance(new Object[]{context});
+ result = constructor.newInstance(new Object[]{context, subQuery});
} catch (Exception e) {
throw new RuntimeException(e);
}
-
return result;
}
- public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf) throws IOException {
- String handlerName = getSchedulerType(conf);
- Class<? extends AbstractTaskScheduler> schedulerClass = CACHED_SCHEDULER_CLASSES.get(handlerName);
- if (schedulerClass == null) {
- schedulerClass = conf.getClass(String.format("tajo.querymaster.scheduler-handler.%s.class", handlerName), null, AbstractTaskScheduler.class);
- CACHED_SCHEDULER_CLASSES.put(handlerName, schedulerClass);
- }
-
- if (schedulerClass == null) {
- throw new IOException("Unknown Scheduler Type: " + handlerName);
- }
-
- return schedulerClass;
- }
-
- public static String getSchedulerType(Configuration conf) {
- return conf.get("tajo.querymaster.scheduler-handler.type", "default");
+ public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, SubQuery subQuery)
+ throws IOException {
+ return get(getTaskSchedulerClass(conf), context, subQuery);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/df5727c4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
deleted file mode 100644
index 00bce5b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/DefaultTaskSchedulerEvent.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.master.querymaster.QueryUnitAttempt;
-import org.apache.tajo.storage.DataLocation;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class DefaultTaskSchedulerEvent extends TaskSchedulerEvent {
- private final QueryUnitAttemptId attemptId;
- private final boolean isLeafQuery;
- private final List<DataLocation> dataLocations;
- private final String[] racks;
-
- public DefaultTaskSchedulerEvent(final EventType eventType,
- final QueryUnitAttempt attempt) {
- super(eventType, attempt.getId().getQueryUnitId().getExecutionBlockId());
- this.attemptId = attempt.getId();
- this.isLeafQuery = attempt.isLeafTask();
- if (this.isLeafQuery) {
- this.dataLocations = attempt.getQueryUnit().getDataLocations();
- Set<String> racks = new HashSet<String>();
- for (DataLocation location : attempt.getQueryUnit().getDataLocations()) {
- racks.add(RackResolver.resolve(location.getHost()).getNetworkLocation());
- }
- this.racks = racks.toArray(new String[racks.size()]);
- } else {
- this.dataLocations = null;
- this.racks = null;
- }
- }
-
- public DefaultTaskSchedulerEvent(final QueryUnitAttemptId attemptId,
- final EventType eventType, boolean isLeafQuery,
- final List<DataLocation> dataLocations,
- final String[] racks) {
- super(eventType, attemptId.getQueryUnitId().getExecutionBlockId());
- this.attemptId = attemptId;
- this.isLeafQuery = isLeafQuery;
- this.dataLocations = dataLocations;
- this.racks = racks;
- }
-
- public QueryUnitAttemptId getAttemptId() {
- return this.attemptId;
- }
-
- public boolean isLeafQuery() {
- return this.isLeafQuery;
- }
-
- public List<DataLocation> getDataLocations() {
- return this.dataLocations;
- }
-
- public String[] getRacks() {
- return this.racks;
- }
-
- @Override
- public String toString() {
- return "DefaultTaskSchedulerEvent{" +
- "attemptId=" + attemptId +
- ", isLeafQuery=" + isLeafQuery +
- ", hosts=" + (dataLocations == null ? null : dataLocations) +
- ", racks=" + (racks == null ? null : Arrays.asList(racks)) +
- '}';
- }
-}