You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/05/08 09:47:40 UTC
git commit: TAJO-54: SubQuery::allocateContainers() may ask 0
containers. (hyunsik)
Updated Branches:
refs/heads/master 6628017a5 -> fc4743686
TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fc474368
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fc474368
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fc474368
Branch: refs/heads/master
Commit: fc4743686553a5bccfb0195c93d9e1078f4f22ff
Parents: 6628017
Author: hyunsik <hy...@gmail.com>
Authored: Wed May 8 16:40:48 2013 +0900
Committer: hyunsik <hy...@gmail.com>
Committed: Wed May 8 16:40:48 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 4 ++-
.../src/main/java/tajo/master/QueryMaster.java | 7 ++++-
.../src/main/java/tajo/master/SubQuery.java | 11 +++-----
.../java/tajo/master/rm/RMContainerAllocator.java | 19 ++++++++++++++-
4 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a6a6db..cba4f8f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,7 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
TAJO-37: Remove obsolete classes WorkerEventDispatcher, WorkerEvent and
- WorkerEventType. (sunny.1324 via hyunsik)
+ WorkerEventType. (sunny.1324 via hyunsik)
TAJO-50: Cleanup SubQuery. (hyunsik)
@@ -37,6 +37,8 @@ Release 0.2.0 - unreleased
BUG FIXES
+ TAJO-54: SubQuery::allocateContainers() may ask 0 containers. (hyunsik)
+
TAJO-47: RowFile has the duplicated initialization problem and unflipped
ByteBuffer problem. (jihoon)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 8891861..3f3db05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -227,6 +227,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
int minCapability;
int maxCapability;
+ int numCluster;
public QueryContext(QueryConf conf) {
this.conf = conf;
@@ -297,7 +298,11 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
public int getNumClusterNode() {
- return rmAllocator.getClusterNodeCount();
+ return numCluster;
+ }
+
+ public void setNumClusterNodes(int num) {
+ numCluster = num;
}
public CatalogService getCatalog() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index 398e1ae..986ecaf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.*;
import org.apache.hadoop.yarn.util.Records;
import tajo.QueryIdFactory;
@@ -614,13 +613,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit [] tasks = subQuery.getQueryUnits();
- int numRequest = Math.min(tasks.length,
- subQuery.context.getNumClusterNode() * 4);
+ int numClusterNodes = subQuery.getContext().getNumClusterNode();
+ int numRequest = Math.min(tasks.length, numClusterNodes * 4);
- final Resource resource =
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
- Resource.class);
- if (tasks.length <= subQuery.context.getNumClusterNode()) {
+ final Resource resource = Records.newRecord(Resource.class);
+ if (tasks.length <= numClusterNodes) {
resource.setMemory(subQuery.context.getMaxContainerCapability());
} else {
resource.setMemory(2000);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc474368/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
index 0a1ad42..9d5ae6c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.AMRMClientImpl;
@@ -62,6 +63,7 @@ public class RMContainerAllocator extends AMRMClientImpl
super.init(conf);
}
+ private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
public void start() {
super.start();
@@ -70,6 +72,19 @@ public class RMContainerAllocator extends AMRMClientImpl
response = registerApplicationMaster("locahost", 10080, "http://localhost:1234");
context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
+
+ // If the number of cluster nodes is ZERO, it waits for available nodes.
+ AllocateResponse allocateResponse = allocate(0.0f);
+ while(allocateResponse.getNumClusterNodes() < 1) {
+ try {
+ Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
+ LOG.info("Waiting for Available Cluster Nodes");
+ allocateResponse = allocate(0);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ context.setNumClusterNodes(allocateResponse.getNumClusterNodes());
} catch (YarnRemoteException e) {
LOG.error(e);
}
@@ -135,9 +150,11 @@ public class RMContainerAllocator extends AMRMClientImpl
new HashMap<Priority, SubQueryId>();
public void heartbeat() throws Exception {
- AMResponse response = allocate(context.getProgress()).getAMResponse();
+ AllocateResponse allocateResponse = allocate(context.getProgress());
+ AMResponse response = allocateResponse.getAMResponse();
List<Container> allocatedContainers = response.getAllocatedContainers();
+ LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
LOG.info("Available Resource: " + response.getAvailableResources());
LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
if (response.getAllocatedContainers().size() > 0) {