You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/09/12 19:00:43 UTC

git commit: TAJO-168: infinite loop occurs when QueryMaster is stopping. (jinho)

Updated Branches:
  refs/heads/master 3b595ddd3 -> 5d3966a8c


TAJO-168: infinite loop occurs when QueryMaster is stopping. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5d3966a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5d3966a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5d3966a8

Branch: refs/heads/master
Commit: 5d3966a8cd4c3256abf47d4a9084adc7778e8da3
Parents: 3b595dd
Author: jinossy <ji...@gmail.com>
Authored: Fri Sep 13 02:00:02 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Fri Sep 13 02:00:02 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tajo/master/TaskSchedulerImpl.java   |  3 +
 .../tajo/master/querymaster/SubQuery.java       |  3 +-
 .../master/rm/TajoWorkerResourceManager.java    |  6 +-
 .../apache/tajo/master/rm/WorkerResource.java   | 61 ++++++++++++++++----
 .../tajo/master/rm/YarnTajoResourceManager.java |  2 +-
 6 files changed, 60 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 11ea618..22e2518 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -133,6 +133,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-168: infinite loop occurs when QueryMaster is stopping. (jinho)
+
     TAJO-180: Better error messages for 
     StorageManager.listStatus$InvalidInputException. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 8bb17b1..71b0114 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -398,6 +398,9 @@ public class TaskSchedulerImpl extends AbstractService
         LOG.info("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
             "containerId=" + taskRequest.getContainerId());
         ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
+
+        if(container == null) continue;
+
         String host = container.getTaskHostName();
 
         QueryUnitAttemptId attemptId = null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 43895d8..ac92386 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -625,7 +624,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
       final Resource resource = Records.newRecord(Resource.class);
 
-      resource.setMemory(2048);
+      resource.setMemory(2000);
 
       LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index eb710fa..7c4bf59 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -209,7 +209,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
               resourceRequest.request.getDiskSlots(),
               resourceRequest.request.getNumWorks());
 
-          LOG.info("====> allocateWorkerResources: allocated:" + workerResources.size());
+          LOG.debug("====> allocateWorkerResources: allocated:" + workerResources.size());
 
           if(workerResources.size() > 0) {
             if(resourceRequest.queryMasterRequest) {
@@ -239,6 +239,8 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
               }
               LOG.debug("=========================================");
             }
+            requestQueue.add(resourceRequest);
+            Thread.sleep(100);
           }
         } catch(InterruptedException ie) {
         }
@@ -332,13 +334,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
         queryMasterWorkerResource = queryMasterMap.remove(queryId);
       }
     }
-    LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
     WorkerResource workerResource = new WorkerResource();
     workerResource.copyId(queryMasterWorkerResource);
     workerResource.setMemoryMBSlots(queryMasterMemoryMB);
     workerResource.setDiskSlots(queryMasterDiskSlot);
     workerResource.setCpuCoreSlots(0);
     releaseWorkerResource(queryId, workerResource);
+    LOG.info("released QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
   }
 
   public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index b21570c..b958761 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -21,6 +21,9 @@ package org.apache.tajo.master.rm;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class WorkerResource {
   private static final Log LOG = LogFactory.getLog(WorkerResource.class);
 
@@ -39,6 +42,10 @@ public class WorkerResource {
 
   private boolean queryMasterAllocated;
 
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Lock rlock = lock.readLock();
+  private final Lock wlock = lock.writeLock();
+
   private WorkerStatus workerStatus;
 
   private long lastHeartbeat;
@@ -65,7 +72,12 @@ public class WorkerResource {
   }
 
   public void addUsedMemoryMBSlots(int memoryMBSlots) {
-    usedMemoryMBSlots += memoryMBSlots;
+    try {
+      wlock.lock();
+      usedMemoryMBSlots += memoryMBSlots;
+    } finally {
+      wlock.unlock();
+    }
   }
 
   public void addUsedCpuCoreSlots(int cpuCoreSlots) {
@@ -89,11 +101,21 @@ public class WorkerResource {
   }
 
   public int getMemoryMBSlots() {
-    return memoryMBSlots;
+    try {
+      rlock.lock();
+      return memoryMBSlots;
+    } finally {
+      rlock.unlock();
+    }
   }
 
   public void setMemoryMBSlots(int memoryMBSlots) {
-    this.memoryMBSlots = memoryMBSlots;
+    try {
+      wlock.lock();
+      this.memoryMBSlots = memoryMBSlots;
+    } finally {
+      wlock.unlock();
+    }
   }
 
   public int getAvailableDiskSlots() {
@@ -101,13 +123,13 @@ public class WorkerResource {
   }
 
   public int getAvailableMemoryMBSlots() {
-    return memoryMBSlots - usedMemoryMBSlots;
+    return getMemoryMBSlots() - getUsedMemoryMBSlots();
   }
 
   @Override
   public String toString() {
     return "host:" + allocatedHost + ", port=" + portsToStr() + ", slots=" + memoryMBSlots + ":" + cpuCoreSlots + ":" + diskSlots +
-        ", used=" + usedMemoryMBSlots + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
+        ", used=" + getUsedMemoryMBSlots() + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
   }
 
   public String portsToStr() {
@@ -119,11 +141,22 @@ public class WorkerResource {
   }
 
   public int getUsedMemoryMBSlots() {
-    return usedMemoryMBSlots;
+    try {
+      rlock.lock();
+      return usedMemoryMBSlots;
+    } finally {
+      rlock.unlock();
+    }
   }
 
   public void setUsedMemoryMBSlots(int usedMemoryMBSlots) {
-    this.usedMemoryMBSlots = usedMemoryMBSlots;
+    try {
+      wlock.lock();
+      this.usedMemoryMBSlots = usedMemoryMBSlots;
+    } finally {
+      wlock.unlock();
+    }
+
   }
 
   public int getUsedCpuCoreSlots() {
@@ -167,10 +200,14 @@ public class WorkerResource {
         queryMasterAllocated = false;
     }
 
-    usedMemoryMBSlots = usedMemoryMBSlots - workerResource.memoryMBSlots;
-    //usedDiskSlots = usedDiskSlots - workerResource.diskSlots;
+    try {
+      wlock.lock();
+      usedMemoryMBSlots = usedMemoryMBSlots - workerResource.getMemoryMBSlots();
+    } finally {
+      wlock.unlock();
+    }
 
-    if(usedMemoryMBSlots < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
+    if(getUsedMemoryMBSlots() < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
       LOG.warn("Used resources can't be a minus.");
       LOG.warn(this + " ==> " + workerResource);
     }
@@ -178,7 +215,7 @@ public class WorkerResource {
 
   public int getSlots() {
     //TODO what is slot? 512MB = 1slot?
-    return memoryMBSlots/512;
+    return getMemoryMBSlots()/512;
   }
 
   public int getAvaliableSlots() {
@@ -188,7 +225,7 @@ public class WorkerResource {
 
   public int getUsedSlots() {
     //TODO what is slot? 512MB = 1slot?
-    return usedMemoryMBSlots/512;
+    return getUsedMemoryMBSlots()/512;
   }
 
   public int getManagerPort() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5d3966a8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index da7daf0..0baa55c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -267,7 +267,7 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
 
     final Resource resource = Records.newRecord(Resource.class);
     // TODO - get default value from conf
-    resource.setMemory(2048);
+    resource.setMemory(2000);
     resource.setVirtualCores(1);
 
     Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();