You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/05 03:58:14 UTC

git commit: TAJO-158: Can't allocate worker when single SubQuery requests more than cluster capacity. (hyoungjunkim via hyunsik)

Updated Branches:
  refs/heads/master 68d72cf52 -> 0b513ca07


TAJO-158: Can't allocate worker when single SubQuery requests more than cluster capacity. (hyoungjunkim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/0b513ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/0b513ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/0b513ca0

Branch: refs/heads/master
Commit: 0b513ca07988ae1f008c9a47db993f3bc6ef43cc
Parents: 68d72cf
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Sep 5 10:56:39 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Sep 5 10:57:46 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../master/event/ContainerAllocationEvent.java  |  4 ++
 .../master/rm/TajoWorkerResourceManager.java    | 51 --------------------
 .../tajo/worker/TajoResourceAllocator.java      | 29 +++++++----
 .../java/org/apache/tajo/worker/TaskRunner.java |  4 +-
 5 files changed, 28 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6acfa57..9e9c2b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -126,6 +126,9 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-158: Can't allocate worker when single SubQuery requests more than
+    cluster capacity. (hyoungjunkim via hyunsik)
+
     TAJO-157: The CSVScanner.isSplittable() function does not work properly.
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
index ee594a3..c3a9a59 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
@@ -70,4 +70,8 @@ public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEv
   public float getProgress() {
     return progress;
   }
+
+  public Resource getResource() {
+    return resource;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 93450e1..3701d58 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -55,8 +55,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private final BlockingQueue<WorkerResourceRequest> requestQueue;
 
-  private ReAllocationThread reAllocator;
-
   private final List<WorkerResourceRequest> reAllocationList;
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -76,9 +74,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
     workerResourceAllocator = new WorkerResourceAllocationThread();
     workerResourceAllocator.start();
-
-    reAllocator = new ReAllocationThread();
-    reAllocator.start();
   }
 
   public Map<String, WorkerResource> getWorkers() {
@@ -105,10 +100,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     if(workerResourceAllocator != null) {
       workerResourceAllocator.interrupt();
     }
-
-    if(reAllocator != null) {
-      reAllocator.interrupt();
-    }
   }
 
   @Override
@@ -258,54 +249,12 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
               LOG.debug("=========================================");
             }
           }
-          if(workerResources.size() < resourceRequest.request.getNumWorks()) {
-            reAllocationList.add(new WorkerResourceRequest(
-                resourceRequest.queryId,
-                resourceRequest.queryMasterRequest,
-                TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
-                  .setMemoryMBSlots(resourceRequest.request.getMemoryMBSlots())
-                  .setDiskSlots(resourceRequest.request.getDiskSlots())
-                  .setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
-                  .setNumWorks(resourceRequest.request.getNumWorks() - workerResources.size())
-                  .build(),
-                resourceRequest.callBack));
-          }
         } catch(InterruptedException ie) {
         }
       }
     }
   }
 
-  class ReAllocationThread extends Thread {
-    public void run() {
-      List<WorkerResourceRequest> copiedList = new ArrayList<WorkerResourceRequest>();
-      while(!stopped.get()) {
-        copiedList.clear();
-        synchronized(reAllocationList) {
-          try {
-            reAllocationList.wait(3 * 1000);
-          } catch (InterruptedException e) {
-            if(stopped.get()) {
-              break;
-            }
-          }
-          copiedList.addAll(reAllocationList);
-        }
-
-        for(WorkerResourceRequest eachRequest: copiedList) {
-          try {
-            requestQueue.put(eachRequest);
-          } catch (InterruptedException e) {
-            break;
-          }
-        }
-        synchronized(reAllocationList) {
-          reAllocationList.clear();
-        }
-      }
-    }
-  }
-
   private List<WorkerResource> chooseWorkers(boolean queryMaster,
                                              int requiredMemoryMBSlots, int requiredDiskSlots,
                                              int numWorkerSlots) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 33f9b0f..c1afb80 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -25,9 +25,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -267,20 +265,23 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       queryContext.getQueryMasterContext().getWorkerContext().
           getTajoMasterRpcClient().allocateWorkerResources(null, request, callBack);
 
-      int numAllocatedWorkers = 0;
+      TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
       while(!stopped.get()) {
-        TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
         try {
           response = callBack.get(3, TimeUnit.SECONDS);
+          break;
         } catch (InterruptedException e) {
           if(stopped.get()) {
-            break;
+            return;
           }
         } catch (TimeoutException e) {
           LOG.info("No available worker resource for " + event.getExecutionBlockId());
           continue;
         }
+      }
+      int numAllocatedWorkers = 0;
 
+      if(response != null) {
         List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts = response.getWorkerAllocatedResourceList();
         ExecutionBlockId executionBlockId = event.getExecutionBlockId();
 
@@ -335,11 +336,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           queryContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
         }
         numAllocatedWorkers += workerHosts.size();
-        if(numAllocatedWorkers >= event.getRequiredNum()) {
-          break;
-        }
+
+      }
+      if(event.getRequiredNum() > numAllocatedWorkers) {
+        ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
+            event.getType(), event.getExecutionBlockId(), event.getPriority(),
+            event.getResource(),
+            event.getRequiredNum() - numAllocatedWorkers,
+            event.isLeafQuery(), event.getProgress()
+        );
+        queryContext.getEventHandler().handle(shortRequestEvent);
+
       }
-      LOG.info("======> Stop TajoWorkerAllocationThread");
+      LOG.info("Stop TajoWorkerAllocationThread");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 5802ade..6128bb3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -299,7 +299,7 @@ public class TaskRunner extends AbstractService {
             try {
               if (callFuture == null) {
                 callFuture = new CallFuture2<QueryUnitRequestProto>();
-                LOG.info("====>Request GetTask:" + getId());
+                LOG.info("Request GetTask: " + getId());
                 GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
                     .setExecutionBlockId(executionBlockId.getProto())
                     .setContainerId(((ContainerIdPBImpl) containerId).getProto())
@@ -319,7 +319,7 @@ public class TaskRunner extends AbstractService {
                 }
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
-                LOG.warn("Timeout getResource:" + getId() + ", but retry", te);
+                LOG.warn("Timeout GetTask:" + getId() + ", but retry", te);
                 continue;
               }