You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/21 20:20:53 UTC

[07/10] git commit: TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally on JDK 1.7. (jinho)

TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally on JDK 1.7. (jinho)

Closes #144


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1b3d51e1
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1b3d51e1
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1b3d51e1

Branch: refs/heads/block_iteration
Commit: 1b3d51e149039db86d8c65f1fe3f0c8953b6faf4
Parents: 621d914
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 16:58:55 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 16:58:55 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../tajo/master/DefaultTaskScheduler.java       | 49 +++++++++++++-------
 2 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1b3d51e1/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2d10e9c..995cd27 100644
--- a/CHANGES
+++ b/CHANGES
@@ -143,6 +143,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally
+    on JDK 1.7. (jinho)
+
     TAJO-1056: Wrong resource release or wrong task scheduling. (jinho)
 
     TAJO-1050: RPC client does not retry during connecting. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1b3d51e1/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index c5cf430..62d4892 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.master;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -95,10 +96,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             synchronized (schedulingThread){
               schedulingThread.wait(100);
             }
+            schedule();
           } catch (InterruptedException e) {
             break;
+          } catch (Throwable e) {
+            LOG.fatal(e.getMessage(), e);
+            break;
           }
-          schedule();
         }
         LOG.info("TaskScheduler schedulingThread stopped");
       }
@@ -452,7 +456,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) {
           if (!this.getHost().equals(location.getHost())) {
             HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
-            volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+            if (volumeMapping != null) {
+              volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+            }
           }
         }
       }
@@ -589,11 +595,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     // if the task is not included in leafTasks and nonLeafTasks.
     private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
     private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
-    private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>();
-    private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping =
-        new HashMap<String, HashSet<QueryUnitAttemptId>>();
+    private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap();
+    private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
 
-    private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
+    private synchronized void addLeafTask(QueryUnitAttemptScheduleEvent event) {
       QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
       List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
 
@@ -646,8 +651,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
       if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
-        while (hostVolumeMapping.getRemainingLocalTaskSize() > 0) {
+        for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) {
           QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+
+          if(attemptId == null) break;
           //find remaining local task
           if (leafTasks.contains(attemptId)) {
             leafTasks.remove(attemptId);
@@ -663,22 +670,30 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
     private QueryUnitAttemptId allocateRackTask(String host) {
 
-      List<HostVolumeMapping> remainingTasks = new ArrayList<HostVolumeMapping>(leafTaskHostMapping.values());
+      List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values());
       String rack = RackResolver.resolve(host).getNetworkLocation();
       QueryUnitAttemptId attemptId = null;
 
       if (remainingTasks.size() > 0) {
-        //find largest remaining task of other host in rack
-        Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
-          @Override
-          public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
-            // descending remaining tasks
-            return Integer.valueOf(v2.remainTasksNum.get()).compareTo(Integer.valueOf(v1.remainTasksNum.get()));
-          }
-        });
+        synchronized (scheduledRequests) {
+          //find largest remaining task of other host in rack
+          Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+            @Override
+            public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+              // descending remaining tasks
+              if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) {
+                return 1;
+              } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) {
+                return 0;
+              } else {
+                return -1;
+              }
+            }
+          });
+        }
 
         for (HostVolumeMapping tasks : remainingTasks) {
-          while (tasks.getRemainingLocalTaskSize() > 0){
+          for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) {
             QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack);
 
             if (tId == null) break;