You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/05 03:58:14 UTC
git commit: TAJO-158: Can't allocate worker when single SubQuery
requests more than cluster capacity. (hyoungjunkim via hyunsik)
Updated Branches:
refs/heads/master 68d72cf52 -> 0b513ca07
TAJO-158: Can't allocate worker when single SubQuery requests more than cluster capacity. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/0b513ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/0b513ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/0b513ca0
Branch: refs/heads/master
Commit: 0b513ca07988ae1f008c9a47db993f3bc6ef43cc
Parents: 68d72cf
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Sep 5 10:56:39 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Sep 5 10:57:46 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../master/event/ContainerAllocationEvent.java | 4 ++
.../master/rm/TajoWorkerResourceManager.java | 51 --------------------
.../tajo/worker/TajoResourceAllocator.java | 29 +++++++----
.../java/org/apache/tajo/worker/TaskRunner.java | 4 +-
5 files changed, 28 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6acfa57..9e9c2b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -126,6 +126,9 @@ Release 0.2.0 - unreleased
BUG FIXES
+ TAJO-158: Can't allocate worker when single SubQuery requests more than
+ cluster capacity. (hyoungjunkim via hyunsik)
+
TAJO-157: The CSVScanner.isSplittable() function does not work properly.
(jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
index ee594a3..c3a9a59 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
@@ -70,4 +70,8 @@ public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEv
public float getProgress() {
return progress;
}
+
+ public Resource getResource() {
+ return resource;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 93450e1..3701d58 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -55,8 +55,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private final BlockingQueue<WorkerResourceRequest> requestQueue;
- private ReAllocationThread reAllocator;
-
private final List<WorkerResourceRequest> reAllocationList;
private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -76,9 +74,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResourceAllocator = new WorkerResourceAllocationThread();
workerResourceAllocator.start();
-
- reAllocator = new ReAllocationThread();
- reAllocator.start();
}
public Map<String, WorkerResource> getWorkers() {
@@ -105,10 +100,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
if(workerResourceAllocator != null) {
workerResourceAllocator.interrupt();
}
-
- if(reAllocator != null) {
- reAllocator.interrupt();
- }
}
@Override
@@ -258,54 +249,12 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
LOG.debug("=========================================");
}
}
- if(workerResources.size() < resourceRequest.request.getNumWorks()) {
- reAllocationList.add(new WorkerResourceRequest(
- resourceRequest.queryId,
- resourceRequest.queryMasterRequest,
- TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
- .setMemoryMBSlots(resourceRequest.request.getMemoryMBSlots())
- .setDiskSlots(resourceRequest.request.getDiskSlots())
- .setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
- .setNumWorks(resourceRequest.request.getNumWorks() - workerResources.size())
- .build(),
- resourceRequest.callBack));
- }
} catch(InterruptedException ie) {
}
}
}
}
- class ReAllocationThread extends Thread {
- public void run() {
- List<WorkerResourceRequest> copiedList = new ArrayList<WorkerResourceRequest>();
- while(!stopped.get()) {
- copiedList.clear();
- synchronized(reAllocationList) {
- try {
- reAllocationList.wait(3 * 1000);
- } catch (InterruptedException e) {
- if(stopped.get()) {
- break;
- }
- }
- copiedList.addAll(reAllocationList);
- }
-
- for(WorkerResourceRequest eachRequest: copiedList) {
- try {
- requestQueue.put(eachRequest);
- } catch (InterruptedException e) {
- break;
- }
- }
- synchronized(reAllocationList) {
- reAllocationList.clear();
- }
- }
- }
- }
-
private List<WorkerResource> chooseWorkers(boolean queryMaster,
int requiredMemoryMBSlots, int requiredDiskSlots,
int numWorkerSlots) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 33f9b0f..c1afb80 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -25,9 +25,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -267,20 +265,23 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
queryContext.getQueryMasterContext().getWorkerContext().
getTajoMasterRpcClient().allocateWorkerResources(null, request, callBack);
- int numAllocatedWorkers = 0;
+ TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
- TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
try {
response = callBack.get(3, TimeUnit.SECONDS);
+ break;
} catch (InterruptedException e) {
if(stopped.get()) {
- break;
+ return;
}
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
continue;
}
+ }
+ int numAllocatedWorkers = 0;
+ if(response != null) {
List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
@@ -335,11 +336,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
queryContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
}
numAllocatedWorkers += workerHosts.size();
- if(numAllocatedWorkers >= event.getRequiredNum()) {
- break;
- }
+
+ }
+ if(event.getRequiredNum() > numAllocatedWorkers) {
+ ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
+ event.getType(), event.getExecutionBlockId(), event.getPriority(),
+ event.getResource(),
+ event.getRequiredNum() - numAllocatedWorkers,
+ event.isLeafQuery(), event.getProgress()
+ );
+ queryContext.getEventHandler().handle(shortRequestEvent);
+
}
- LOG.info("======> Stop TajoWorkerAllocationThread");
+ LOG.info("Stop TajoWorkerAllocationThread");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b513ca0/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 5802ade..6128bb3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -299,7 +299,7 @@ public class TaskRunner extends AbstractService {
try {
if (callFuture == null) {
callFuture = new CallFuture2<QueryUnitRequestProto>();
- LOG.info("====>Request GetTask:" + getId());
+ LOG.info("Request GetTask: " + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
.setContainerId(((ContainerIdPBImpl) containerId).getProto())
@@ -319,7 +319,7 @@ public class TaskRunner extends AbstractService {
}
// if there has been no assigning task for a given period,
// TaskRunner will retry to request an assigning task.
- LOG.warn("Timeout getResource:" + getId() + ", but retry", te);
+ LOG.warn("Timeout GetTask:" + getId() + ", but retry", te);
continue;
}