You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/21 20:20:53 UTC
[07/10] git commit: TAJO-1047: DefaultTaskScheduler:allocateRackTask
is failed occasionally on JDK 1.7. (jinho)
TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally on JDK 1.7. (jinho)
Closes #144
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1b3d51e1
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1b3d51e1
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1b3d51e1
Branch: refs/heads/block_iteration
Commit: 1b3d51e149039db86d8c65f1fe3f0c8953b6faf4
Parents: 621d914
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 16:58:55 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 16:58:55 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../tajo/master/DefaultTaskScheduler.java | 49 +++++++++++++-------
2 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1b3d51e1/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2d10e9c..995cd27 100644
--- a/CHANGES
+++ b/CHANGES
@@ -143,6 +143,9 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally
+ on JDK 1.7. (jinho)
+
TAJO-1056: Wrong resource release or wrong task scheduling. (jinho)
TAJO-1050: RPC client does not retry during connecting.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1b3d51e1/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index c5cf430..62d4892 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -19,6 +19,7 @@
package org.apache.tajo.master;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -95,10 +96,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
synchronized (schedulingThread){
schedulingThread.wait(100);
}
+ schedule();
} catch (InterruptedException e) {
break;
+ } catch (Throwable e) {
+ LOG.fatal(e.getMessage(), e);
+ break;
}
- schedule();
}
LOG.info("TaskScheduler schedulingThread stopped");
}
@@ -452,7 +456,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) {
if (!this.getHost().equals(location.getHost())) {
HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
- volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+ if (volumeMapping != null) {
+ volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+ }
}
}
}
@@ -589,11 +595,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
// if the task is not included in leafTasks and nonLeafTasks.
private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
- private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>();
- private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping =
- new HashMap<String, HashSet<QueryUnitAttemptId>>();
+ private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap();
+ private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
- private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
+ private synchronized void addLeafTask(QueryUnitAttemptScheduleEvent event) {
QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
@@ -646,8 +651,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
- while (hostVolumeMapping.getRemainingLocalTaskSize() > 0) {
+ for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) {
QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+
+ if(attemptId == null) break;
//find remaining local task
if (leafTasks.contains(attemptId)) {
leafTasks.remove(attemptId);
@@ -663,22 +670,30 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
private QueryUnitAttemptId allocateRackTask(String host) {
- List<HostVolumeMapping> remainingTasks = new ArrayList<HostVolumeMapping>(leafTaskHostMapping.values());
+ List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values());
String rack = RackResolver.resolve(host).getNetworkLocation();
QueryUnitAttemptId attemptId = null;
if (remainingTasks.size() > 0) {
- //find largest remaining task of other host in rack
- Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
- @Override
- public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
- // descending remaining tasks
- return Integer.valueOf(v2.remainTasksNum.get()).compareTo(Integer.valueOf(v1.remainTasksNum.get()));
- }
- });
+ synchronized (scheduledRequests) {
+ //find largest remaining task of other host in rack
+ Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+ @Override
+ public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+ // descending remaining tasks
+ if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) {
+ return 1;
+ } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ });
+ }
for (HostVolumeMapping tasks : remainingTasks) {
- while (tasks.getRemainingLocalTaskSize() > 0){
+ for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) {
QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack);
if (tId == null) break;