You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/05/08 09:47:40 UTC

git commit: TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik)

Updated Branches:
  refs/heads/master 6628017a5 -> fc4743686


TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fc474368
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fc474368
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fc474368

Branch: refs/heads/master
Commit: fc4743686553a5bccfb0195c93d9e1078f4f22ff
Parents: 6628017
Author: hyunsik <hy...@gmail.com>
Authored: Wed May 8 16:40:48 2013 +0900
Committer: hyunsik <hy...@gmail.com>
Committed: Wed May 8 16:40:48 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |    4 ++-
 .../src/main/java/tajo/master/QueryMaster.java     |    7 ++++-
 .../src/main/java/tajo/master/SubQuery.java        |   11 +++-----
 .../java/tajo/master/rm/RMContainerAllocator.java  |   19 ++++++++++++++-
 4 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a6a6db..cba4f8f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,7 @@ Release 0.2.0 - unreleased
   IMPROVEMENTS
 
     TAJO-37: Remove obsolete classes WorkerEventDispatcher, WorkerEvent and 
-	WorkerEventType. (sunny.1324 via hyunsik)
+    WorkerEventType. (sunny.1324 via hyunsik)
 
     TAJO-50: Cleanup SubQuery. (hyunsik)
 
@@ -37,6 +37,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik)
+
     TAJO-47: RowFile has the duplicated initialization problem and unflipped 
     ByteBuffer problem. (jihoon)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 8891861..3f3db05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -227,6 +227,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
     int minCapability;
     int maxCapability;
+    int numCluster;
 
     public QueryContext(QueryConf conf) {
       this.conf = conf;
@@ -297,7 +298,11 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
 
     public int getNumClusterNode() {
-      return rmAllocator.getClusterNodeCount();
+      return numCluster;
+    }
+
+    public void setNumClusterNodes(int num) {
+      numCluster = num;
     }
 
     public CatalogService getCatalog() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index 398e1ae..986ecaf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.hadoop.yarn.util.Records;
 import tajo.QueryIdFactory;
@@ -614,13 +613,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       ExecutionBlock execBlock = subQuery.getBlock();
       QueryUnit [] tasks = subQuery.getQueryUnits();
 
-      int numRequest = Math.min(tasks.length,
-          subQuery.context.getNumClusterNode() * 4);
+      int numClusterNodes = subQuery.getContext().getNumClusterNode();
+      int numRequest = Math.min(tasks.length, numClusterNodes * 4);
 
-      final Resource resource =
-          RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
-              Resource.class);
-      if (tasks.length <= subQuery.context.getNumClusterNode()) {
+      final Resource resource = Records.newRecord(Resource.class);
+      if (tasks.length <= numClusterNodes) {
         resource.setMemory(subQuery.context.getMaxContainerCapability());
       } else {
         resource.setMemory(2000);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
index 0a1ad42..9d5ae6c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.AMRMClientImpl;
@@ -62,6 +63,7 @@ public class RMContainerAllocator extends AMRMClientImpl
     super.init(conf);
   }
 
+  private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
   public void start() {
     super.start();
 
@@ -70,6 +72,19 @@ public class RMContainerAllocator extends AMRMClientImpl
       response = registerApplicationMaster("locahost", 10080, "http://localhost:1234");
       context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
       context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
+
+      // If the number of cluster nodes is ZERO, it waits for available nodes.
+      AllocateResponse allocateResponse = allocate(0.0f);
+      while(allocateResponse.getNumClusterNodes() < 1) {
+        try {
+          Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
+          LOG.info("Waiting for Available Cluster Nodes");
+          allocateResponse = allocate(0);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+      context.setNumClusterNodes(allocateResponse.getNumClusterNodes());
     } catch (YarnRemoteException e) {
       LOG.error(e);
     }
@@ -135,9 +150,11 @@ public class RMContainerAllocator extends AMRMClientImpl
       new HashMap<Priority, SubQueryId>();
 
   public void heartbeat() throws Exception {
-    AMResponse response = allocate(context.getProgress()).getAMResponse();
+    AllocateResponse allocateResponse = allocate(context.getProgress());
+    AMResponse response = allocateResponse.getAMResponse();
     List<Container> allocatedContainers = response.getAllocatedContainers();
 
+    LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
     LOG.info("Available Resource: " + response.getAvailableResources());
     LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
     if (response.getAllocatedContainers().size() > 0) {