You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/11/01 03:23:59 UTC

git commit: TAJO-295: ConcurrentModificationException in TaskScheduler. (jinho)

Updated Branches:
  refs/heads/master 2186e63b1 -> 5dbdd26d7


TAJO-295: ConcurrentModificationException in TaskScheduler. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5dbdd26d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5dbdd26d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5dbdd26d

Branch: refs/heads/master
Commit: 5dbdd26d76786b22aea38bebfa0b5ad460eebddd
Parents: 2186e63
Author: jinossy <ji...@gmail.com>
Authored: Fri Nov 1 11:23:20 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Nov 1 11:23:20 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/tajo/master/TaskSchedulerImpl.java   | 21 ++++++++++----------
 2 files changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5dbdd26d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0d9084..309ee32 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -230,6 +230,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-295: ConcurrentModificationException in TaskScheduler. (jinho)
+
     TAJO-293: querymasters directory not found in single node setup. (hyunsik)
 
     TAJO-281: 'mvn package -Pdist' generates duplicate Tajo jar files.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5dbdd26d/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 1d53c87..05e64ab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -29,11 +29,10 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
 import org.apache.tajo.master.event.TaskRequestEvent;
 import org.apache.tajo.master.event.TaskScheduleEvent;
@@ -42,8 +41,6 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.URI;
@@ -342,8 +339,8 @@ public class TaskSchedulerImpl extends AbstractService
   }
 
   private class ScheduledRequests {
-    private final HashSet<QueryUnitAttemptId> leafTasks = new HashSet<QueryUnitAttemptId>();
-    private final HashSet<QueryUnitAttemptId> nonLeafTasks = new HashSet<QueryUnitAttemptId>();
+    private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+    private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
     private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
     private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
         new HashMap<String, LinkedList<QueryUnitAttemptId>>();
@@ -453,8 +450,10 @@ public class TaskSchedulerImpl extends AbstractService
 
           // random allocation
           if (attemptId == null && leafTaskNum() > 0) {
-            attemptId = leafTasks.iterator().next();
-            leafTasks.remove(attemptId);
+            synchronized (leafTasks){
+              attemptId = leafTasks.iterator().next();
+              leafTasks.remove(attemptId);
+            }
             //LOG.info(attemptId + " Assigned based on * match");
           }
         }
@@ -502,8 +501,10 @@ public class TaskSchedulerImpl extends AbstractService
         QueryUnitAttemptId attemptId;
         // random allocation
         if (nonLeafTasks.size() > 0) {
-          attemptId = nonLeafTasks.iterator().next();
-          nonLeafTasks.remove(attemptId);
+          synchronized (nonLeafTasks){
+            attemptId = nonLeafTasks.iterator().next();
+            nonLeafTasks.remove(attemptId);
+          }
           LOG.debug("Assigned based on * match");
 
           QueryUnit task;