You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/02 09:16:40 UTC
[2/2] git commit: TAJO-317: Improve TajoResourceManager to support
more elaborate resource management. (Keuntae Park via jihoon)
TAJO-317: Improve TajoResourceManager to support more elaborate resource management. (Keuntae Park via jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/528c914f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/528c914f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/528c914f
Branch: refs/heads/master
Commit: 528c914f9a133bef79df07017cfa424c9fab4412
Parents: 778c01f
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Dec 2 17:16:01 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Dec 2 17:16:01 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 14 +-
.../apache/tajo/master/TajoContainerProxy.java | 38 +-
.../apache/tajo/master/TajoMasterService.java | 25 +-
.../tajo/master/querymaster/SubQuery.java | 9 +-
.../tajo/master/rm/TajoWorkerContainerId.java | 41 ++
.../master/rm/TajoWorkerResourceManager.java | 358 +++++++++++++----
.../apache/tajo/master/rm/WorkerResource.java | 115 +++---
.../tajo/master/rm/WorkerResourceManager.java | 6 +-
.../tajo/master/rm/YarnTajoResourceManager.java | 28 +-
.../apache/tajo/util/ApplicationIdUtils.java | 3 +
.../apache/tajo/worker/ResourceAllocator.java | 3 +-
.../tajo/worker/TajoResourceAllocator.java | 68 ++--
.../java/org/apache/tajo/worker/TajoWorker.java | 86 ++--
.../main/java/org/apache/tajo/worker/Task.java | 6 +-
.../tajo/worker/YarnResourceAllocator.java | 4 +-
.../src/main/proto/TajoMasterProtocol.proto | 64 ++-
.../main/resources/webapps/admin/cluster.jsp | 31 +-
.../src/main/resources/webapps/admin/index.jsp | 22 +-
.../org/apache/tajo/TajoTestingCluster.java | 3 +
.../tajo/master/TestTajoResourceManager.java | 390 +++++++++++++++++++
21 files changed, 1013 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index caadc0c..1646d4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-317: Improve TajoResourceManager to support more elaborate resource management. (Keuntae Park via jihoon)
+
TAJO-314: Make TaskScheduler be pluggable. (jihoon)
TAJO-325: QueryState.NEW and QueryState.INIT should be combined into one
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index eba6eaf..f89838f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -81,6 +81,10 @@ public class TajoConf extends YarnConfiguration {
TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002"),
TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080"),
+ // QueryMaster resource
+ TAJO_QUERYMASTER_DISK_SLOT("tajo.qm.resource.disk.slots", 0.0f),
+ TAJO_QUERYMASTER_MEMORY_MB("tajo.qm.resource.memory-mb", 512),
+
// Tajo Worker Service Addresses
WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080"),
WORKER_QM_INFO_ADDRESS("tajo.worker.qm-info-http.address", "0.0.0.0:28081"),
@@ -93,8 +97,8 @@ public class TajoConf extends YarnConfiguration {
// Tajo Worker Resources
WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1),
- WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024),
- WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1),
+ WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 512),
+ WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
// Tajo Worker Dedicated Resources
@@ -186,6 +190,12 @@ public class TajoConf extends YarnConfiguration {
// Hive Configuration
//////////////////////////////////
HIVE_QUERY_MODE("tajo.hive.query.mode", false),
+
+ //////////////////////////////////
+ // Task Configuration
+ TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
+ TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f)
+ //////////////////////////////////
;
public final String varname;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index d542390..ce5f401 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -21,7 +21,9 @@ package org.apache.tajo.master;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -29,7 +31,7 @@ import org.apache.tajo.master.event.QueryEvent;
import org.apache.tajo.master.event.QueryEventType;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.rm.TajoWorkerContainer;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
@@ -121,7 +123,8 @@ public class TajoContainerProxy extends ContainerProxy {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
try {
- releaseWorkerResource(context, executionBlockId, ((TajoWorkerContainer)container).getWorkerResource());
+ TajoWorkerContainer tajoWorkerContainer = ((TajoWorkerContainer)container);
+ releaseWorkerResource(context, executionBlockId, tajoWorkerContainer.getId());
context.getResourceAllocator().removeContainer(containerID);
this.state = ContainerState.DONE;
} catch (Throwable t) {
@@ -138,29 +141,21 @@ public class TajoContainerProxy extends ContainerProxy {
public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
ExecutionBlockId executionBlockId,
- WorkerResource workerResource) throws Exception {
- List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
- workerResources.add(workerResource);
+ ContainerId containerId) throws Exception {
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(containerId);
- releaseWorkerResource(context, executionBlockId, workerResources);
+ releaseWorkerResource(context, executionBlockId, containerIds);
}
public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
ExecutionBlockId executionBlockId,
- List<WorkerResource> workerResources) throws Exception {
- List<TajoMasterProtocol.WorkerResourceProto> workerResourceProtos =
- new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
-
- for(WorkerResource eahWorkerResource: workerResources) {
- workerResourceProtos.add(TajoMasterProtocol.WorkerResourceProto.newBuilder()
- .setHost(eahWorkerResource.getAllocatedHost())
- .setQueryMasterPort(eahWorkerResource.getQueryMasterPort())
- .setPeerRpcPort(eahWorkerResource.getPeerRpcPort())
- .setExecutionBlockId(executionBlockId.getProto())
- .setMemoryMBSlots(eahWorkerResource.getMemoryMBSlots())
- .setDiskSlots(eahWorkerResource.getDiskSlots())
- .build()
- );
+ List<ContainerId> containerIds) throws Exception {
+ List<YarnProtos.ContainerIdProto> containerIdProtos =
+ new ArrayList<YarnProtos.ContainerIdProto>();
+
+ for(ContainerId eachContainerId: containerIds) {
+ containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
}
RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
@@ -171,7 +166,8 @@ public class TajoContainerProxy extends ContainerProxy {
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
masterClientService.releaseWorkerResource(null,
TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
- .addAllWorkerResources(workerResourceProtos)
+ .setExecutionBlockId(executionBlockId.getProto())
+ .addAllContainerIds(containerIdProtos)
.build(),
NullCallback.get());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index cf193de..c213dd5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -22,13 +22,14 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
@@ -118,8 +119,7 @@ public class TajoMasterService extends AbstractService {
builder.setResponseCommand(command);
}
- builder.setNumClusterNodes(context.getResourceManager().getWorkers().size());
- builder.setNumClusterSlots(context.getResourceManager().getNumClusterSlots());
+ builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
done.run(builder.build());
}
@@ -135,20 +135,11 @@ public class TajoMasterService extends AbstractService {
public void releaseWorkerResource(RpcController controller,
TajoMasterProtocol.WorkerResourceReleaseRequest request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
- List<TajoMasterProtocol.WorkerResourceProto> workerResources = request.getWorkerResourcesList();
- for(TajoMasterProtocol.WorkerResourceProto eachWorkerResource: workerResources) {
- WorkerResource workerResource = new WorkerResource();
- workerResource.setAllocatedHost(eachWorkerResource.getHost());
-
- workerResource.setPeerRpcPort(eachWorkerResource.getPeerRpcPort());
- workerResource.setQueryMasterPort(eachWorkerResource.getQueryMasterPort());
- workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
- workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
-
- LOG.info("releaseWorkerResource:" + workerResource);
- context.getResourceManager().releaseWorkerResource(
- new QueryId(eachWorkerResource.getExecutionBlockId().getQueryId()),
- workerResource);
+ List<YarnProtos.ContainerIdProto> containerIds = request.getContainerIdsList();
+ ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
+
+ for(YarnProtos.ContainerIdProto eachContainer: containerIds) {
+ context.getResourceManager().releaseWorkerResource(ebId, eachContainer);
}
done.run(BOOL_TRUE);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 4aa3866..70bde5c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -611,13 +611,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit [] tasks = subQuery.getQueryUnits();
+ //TODO consider disk slot
+ int requiredMemoryMBPerTask = 512;
+
int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
- subQuery.getContext().getQueryMasterContext().getWorkerContext(), tasks.length
+ subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+ tasks.length,
+ requiredMemoryMBPerTask
);
final Resource resource = Records.newRecord(Resource.class);
- resource.setMemory(2000);
+ resource.setMemory(requiredMemoryMBPerTask);
LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
index f104637..2d7c1c3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.rm;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
public class TajoWorkerContainerId extends ContainerId {
ApplicationAttemptId applicationAttemptId;
@@ -44,4 +45,44 @@ public class TajoWorkerContainerId extends ContainerId {
public void setId(int id) {
this.id = id;
}
+
+ public YarnProtos.ContainerIdProto getProto() {
+ YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+ .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
+ .setId(applicationAttemptId.getApplicationId().getId())
+ .build();
+
+ YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+ .setAttemptId(applicationAttemptId.getAttemptId())
+ .setApplicationId(appIdProto)
+ .build();
+
+ return YarnProtos.ContainerIdProto.newBuilder()
+ .setAppAttemptId(attemptIdProto)
+ .setAppId(appIdProto)
+ .setId(id)
+ .build();
+ }
+
+ public static YarnProtos.ContainerIdProto getContainerIdProto(ContainerId containerId) {
+ if(containerId instanceof TajoWorkerContainerId) {
+ return ((TajoWorkerContainerId)containerId).getProto();
+ } else {
+ YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+ .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
+ .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
+ .build();
+
+ YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+ .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
+ .setApplicationId(appIdProto)
+ .build();
+
+ return YarnProtos.ContainerIdProto.newBuilder()
+ .setAppAttemptId(attemptIdProto)
+ .setAppId(appIdProto)
+ .setId(containerId.getId())
+ .build();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 7ffc563..1485ffe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -22,6 +22,8 @@ import com.google.protobuf.RpcCallback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
@@ -30,17 +32,20 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryJobEvent;
-import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.util.ApplicationIdUtils;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class TajoWorkerResourceManager implements WorkerResourceManager {
private static final Log LOG = LogFactory.getLog(TajoWorkerResourceManager.class);
+ static AtomicInteger containerIdSeq = new AtomicInteger(0);
+
private TajoMaster.MasterContext masterContext;
//all workers(include querymaster)
@@ -59,22 +64,46 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private final Object workerResourceLock = new Object();
- private final String queryIdSeed;
+ private String queryIdSeed;
private WorkerResourceAllocationThread workerResourceAllocator;
private WorkerMonitorThread workerMonitor;
- private final BlockingQueue<WorkerResourceRequest> requestQueue;
+ private BlockingQueue<WorkerResourceRequest> requestQueue;
- private final List<WorkerResourceRequest> reAllocationList;
+ private List<WorkerResourceRequest> reAllocationList;
private AtomicBoolean stopped = new AtomicBoolean(false);
+ private float queryMasterDefaultDiskSlot;
+
+ private int queryMasterDefaultMemoryMB;
+
+ private TajoConf tajoConf;
+
+ private Map<YarnProtos.ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap =
+ new HashMap<YarnProtos.ContainerIdProto, AllocatedWorkerResource>();
+
public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
this.masterContext = masterContext;
+ init(masterContext.getConf());
+ }
+
+ public TajoWorkerResourceManager(TajoConf tajoConf) {
+ init(tajoConf);
+ }
+
+ private void init(TajoConf tajoConf) {
+ this.tajoConf = tajoConf;
this.queryIdSeed = String.valueOf(System.currentTimeMillis());
+ this.queryMasterDefaultDiskSlot =
+ tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+
+ this.queryMasterDefaultMemoryMB =
+ tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
+
requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
reAllocationList = new ArrayList<WorkerResourceRequest>();
@@ -93,15 +122,41 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
return Collections.unmodifiableSet(liveQueryMasterWorkerResources);
}
- public int getNumClusterSlots() {
- int numSlots = 0;
+ @Override
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ int totalDiskSlots = 0;
+ int totalCpuCoreSlots = 0;
+ int totalMemoryMB = 0;
+
+ int totalAvailableDiskSlots = 0;
+ int totalAvailableCpuCoreSlots = 0;
+ int totalAvailableMemoryMB = 0;
+
synchronized(workerResourceLock) {
for(String eachWorker: liveWorkerResources) {
- numSlots += allWorkerResourceMap.get(eachWorker).getSlots();
+ WorkerResource worker = allWorkerResourceMap.get(eachWorker);
+ if(worker != null) {
+ totalMemoryMB += worker.getMemoryMB();
+ totalAvailableMemoryMB += worker.getAvailableMemoryMB();
+
+ totalDiskSlots += worker.getDiskSlots();
+ totalAvailableDiskSlots += worker.getAvailableDiskSlots();
+
+ totalCpuCoreSlots += worker.getCpuCoreSlots();
+ totalAvailableCpuCoreSlots += worker.getAvailableCpuCoreSlots();
+ }
}
}
- return numSlots;
+ return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+ .setNumWorkers(liveWorkerResources.size())
+ .setTotalCpuCoreSlots(totalCpuCoreSlots)
+ .setTotalDiskSlots(totalDiskSlots)
+ .setTotalMemoryMB(totalMemoryMB)
+ .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
+ .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
+ .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
+ .build();
}
@Override
@@ -120,9 +175,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
@Override
public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
+ return allocateQueryMaster(queryInProgress.getQueryId());
+ }
+
+ public WorkerResource allocateQueryMaster(QueryId queryId) {
synchronized(workerResourceLock) {
if(liveQueryMasterWorkerResources.size() == 0) {
- LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
+ LOG.warn("No available resource for querymaster:" + queryId);
return null;
}
WorkerResource queryMasterWorker = null;
@@ -137,9 +196,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
if(queryMasterWorker == null) {
return null;
}
- queryMasterWorker.addNumQueryMasterTask();
- queryMasterMap.put(queryInProgress.getQueryId(), queryMasterWorker);
- LOG.info(queryInProgress.getQueryId() + "'s QueryMaster is " + queryMasterWorker);
+ queryMasterWorker.addNumQueryMasterTask(queryMasterDefaultDiskSlot, queryMasterDefaultMemoryMB);
+ queryMasterMap.put(queryId, queryMasterWorker);
+ LOG.info(queryId + "'s QueryMaster is " + queryMasterWorker);
return queryMasterWorker;
}
}
@@ -152,15 +211,23 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
if(queryMasterWorkerResource != null) {
- startQueryMaster(queryInProgress.getQueryId(), queryMasterWorkerResource);
+ AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+ allocatedWorkerResource.workerResource = queryMasterWorkerResource;
+ allocatedWorkerResource.allocatedMemoryMB = queryMasterDefaultMemoryMB;
+ allocatedWorkerResource.allocatedDiskSlots = queryMasterDefaultDiskSlot;
+
+ startQueryMaster(queryInProgress.getQueryId(), allocatedWorkerResource);
} else {
//add queue
TajoMasterProtocol.WorkerResourceAllocationRequest request =
TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
- .setMemoryMBSlots(1)
- .setDiskSlots(1)
.setExecutionBlockId(QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0).getProto())
- .setNumWorks(1)
+ .setNumContainers(1)
+ .setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB)
+ .setMaxMemoryMBPerContainer(queryMasterDefaultMemoryMB)
+ .setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot)
+ .setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot)
+ .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
.build();
try {
requestQueue.put(new WorkerResourceRequest(queryInProgress.getQueryId(), true, request, null));
@@ -169,13 +236,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
}
- private void startQueryMaster(QueryId queryId, WorkerResource workResource) {
+ private void startQueryMaster(QueryId queryId, AllocatedWorkerResource workResource) {
QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
if(queryInProgress == null) {
LOG.warn("No QueryInProgress while starting QueryMaster:" + queryId);
return;
}
- queryInProgress.getQueryInfo().setQueryMasterResource(workResource);
+ queryInProgress.getQueryInfo().setQueryMasterResource(workResource.workerResource);
//fire QueryJobStart event
queryInProgress.getEventHandler().handle(
@@ -205,7 +272,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
@Override
public void run() {
- heartbeatTimeout = masterContext.getConf().getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_TIMEOUT);
+ heartbeatTimeout = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_TIMEOUT);
LOG.info("WorkerMonitor start");
while(!stopped.get()) {
try {
@@ -272,6 +339,12 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
}
+ class AllocatedWorkerResource {
+ WorkerResource workerResource;
+ int allocatedMemoryMB;
+ float allocatedDiskSlots;
+ }
+
class WorkerResourceAllocationThread extends Thread {
@Override
public void run() {
@@ -283,36 +356,56 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
if (LOG.isDebugEnabled()) {
LOG.debug("allocateWorkerResources:" +
(new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
- ", required:" + resourceRequest.request.getNumWorks() +
+ ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+ "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+ ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+ ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+ "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
", queryMasterRequest=" + resourceRequest.queryMasterRequest +
", liveWorkers=" + liveWorkerResources.size());
}
- List<WorkerResource> workerResources = chooseWorkers(
- resourceRequest.request.getMemoryMBSlots(),
- resourceRequest.request.getDiskSlots(),
- resourceRequest.request.getNumWorks());
-
- LOG.debug("allocateWorkerResources: allocated:" + workerResources.size());
+ List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
- if(workerResources.size() > 0) {
+ if(allocatedWorkerResources.size() > 0) {
if(resourceRequest.queryMasterRequest) {
- startQueryMaster(resourceRequest.queryId, workerResources.get(0));
+ startQueryMaster(resourceRequest.queryId, allocatedWorkerResources.get(0));
} else {
- List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts =
+ List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources =
new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>();
- for(WorkerResource eachWorker: workerResources) {
- workerHosts.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
- .setWorkerHost(eachWorker.getAllocatedHost())
- .setQueryMasterPort(eachWorker.getQueryMasterPort())
- .setPeerRpcPort(eachWorker.getPeerRpcPort())
- .setWorkerPullServerPort(eachWorker.getPullServerPort())
+ for(AllocatedWorkerResource eachWorker: allocatedWorkerResources) {
+ NodeIdPBImpl nodeId = new NodeIdPBImpl();
+
+ nodeId.setHost(eachWorker.workerResource.getAllocatedHost());
+ nodeId.setPort(eachWorker.workerResource.getPeerRpcPort());
+
+ TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+ containerId.setApplicationAttemptId(
+ ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+ containerId.setId(containerIdSeq.incrementAndGet());
+
+ YarnProtos.ContainerIdProto containerIdProto = containerId.getProto();
+ allocatedResources.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
+ .setContainerId(containerIdProto)
+ .setNodeId(nodeId.toString())
+ .setWorkerHost(eachWorker.workerResource.getAllocatedHost())
+ .setQueryMasterPort(eachWorker.workerResource.getQueryMasterPort())
+ .setPeerRpcPort(eachWorker.workerResource.getPeerRpcPort())
+ .setWorkerPullServerPort(eachWorker.workerResource.getPullServerPort())
+ .setAllocatedMemoryMB(eachWorker.allocatedMemoryMB)
+ .setAllocatedDiskSlots(eachWorker.allocatedDiskSlots)
.build());
+
+ synchronized(workerResourceLock) {
+ allocatedResourceMap.put(containerIdProto, eachWorker);
+ }
}
+
resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
.setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
- .addAllWorkerAllocatedResource(workerHosts)
+ .addAllWorkerAllocatedResource(allocatedResources)
.build()
);
}
@@ -335,54 +428,179 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
}
}
- private List<WorkerResource> chooseWorkers(int requiredMemoryMBSlots, int requiredDiskSlots,
- int numWorkerSlots) {
- List<WorkerResource> selectedWorkers = new ArrayList<WorkerResource>();
+ private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) {
+ List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();
- int selectedCount = 0;
+ int allocatedResources = 0;
+
+ if(resourceRequest.queryMasterRequest) {
+ WorkerResource worker = allocateQueryMaster(resourceRequest.queryId);
+ if(worker != null) {
+ AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+ allocatedWorkerResource.workerResource = worker;
+ allocatedWorkerResource.allocatedDiskSlots = queryMasterDefaultDiskSlot;
+ allocatedWorkerResource.allocatedMemoryMB = queryMasterDefaultMemoryMB;
+ selectedWorkers.add(allocatedWorkerResource);
+
+ return selectedWorkers;
+ }
+ }
+
+ TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+ = resourceRequest.request.getResourceRequestPriority();
+
+ if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+ synchronized(workerResourceLock) {
+ List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+ Collections.shuffle(randomWorkers);
+
+ int numContainers = resourceRequest.request.getNumContainers();
+ int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
+ int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
+ float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
+ resourceRequest.request.getMinDiskSlotPerContainer());
+
+ int liveWorkerSize = randomWorkers.size();
+ Set<String> insufficientWorkers = new HashSet<String>();
+ boolean stop = false;
+ boolean checkMax = true;
+ while(!stop) {
+ if(allocatedResources >= numContainers) {
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ if(!checkMax) {
+ break;
+ }
+ insufficientWorkers.clear();
+ checkMax = false;
+ }
+ int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
+
+ for(String eachWorker: randomWorkers) {
+ if(allocatedResources >= numContainers) {
+ stop = true;
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ break;
+ }
- synchronized(workerResourceLock) {
- List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
- Collections.shuffle(randomWorkers);
- int liveWorkerSize = randomWorkers.size();
- Set<String> insufficientWorkers = new HashSet<String>();
- boolean stop = false;
- while(!stop) {
- if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
- break;
- }
- for(String eachWorker: randomWorkers) {
- if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
- stop = true;
- } else {
WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
- if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots) {
- workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
- //workerResource.addUsedDiskSlots(requiredDiskSlots);
- selectedWorkers.add(workerResource);
- selectedCount++;
+ if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
+ int workerMemory;
+ if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
+ workerMemory = maxMemoryMB;
+ } else {
+ workerMemory = workerResource.getAvailableMemoryMB();
+ }
+ AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+ allocatedWorkerResource.workerResource = workerResource;
+ allocatedWorkerResource.allocatedMemoryMB = workerMemory;
+ if(workerResource.getAvailableDiskSlots() >= diskSlot) {
+ allocatedWorkerResource.allocatedDiskSlots = diskSlot;
+ } else {
+ allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
+ }
+
+ workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+ allocatedWorkerResource.allocatedMemoryMB);
+
+ selectedWorkers.add(allocatedWorkerResource);
+
+ allocatedResources++;
} else {
insufficientWorkers.add(eachWorker);
}
}
}
- if(!stop) {
- for(String eachWorker: insufficientWorkers) {
- randomWorkers.remove(eachWorker);
+ }
+ } else {
+ synchronized(workerResourceLock) {
+ List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+ Collections.shuffle(randomWorkers);
+
+ int numContainers = resourceRequest.request.getNumContainers();
+ float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
+ float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
+ int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
+ resourceRequest.request.getMinMemoryMBPerContainer());
+
+ int liveWorkerSize = randomWorkers.size();
+ Set<String> insufficientWorkers = new HashSet<String>();
+ boolean stop = false;
+ boolean checkMax = true;
+ while(!stop) {
+ if(allocatedResources >= numContainers) {
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ if(!checkMax) {
+ break;
+ }
+ insufficientWorkers.clear();
+ checkMax = false;
+ }
+ float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
+
+ for(String eachWorker: randomWorkers) {
+ if(allocatedResources >= numContainers) {
+ stop = true;
+ break;
+ }
+
+ if(insufficientWorkers.size() >= liveWorkerSize) {
+ break;
+ }
+
+ WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
+ if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
+ float workerDiskSlots;
+ if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
+ workerDiskSlots = maxDiskSlots;
+ } else {
+ workerDiskSlots = workerResource.getAvailableDiskSlots();
+ }
+ AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+ allocatedWorkerResource.workerResource = workerResource;
+ allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
+
+ if(workerResource.getAvailableMemoryMB() >= memoryMB) {
+ allocatedWorkerResource.allocatedMemoryMB = memoryMB;
+ } else {
+ allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
+ }
+ workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+ allocatedWorkerResource.allocatedMemoryMB);
+
+ selectedWorkers.add(allocatedWorkerResource);
+
+ allocatedResources++;
+ } else {
+ insufficientWorkers.add(eachWorker);
+ }
}
}
}
}
-
return selectedWorkers;
}
@Override
- public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource) {
+ public void releaseWorkerResource(ExecutionBlockId ebId, YarnProtos.ContainerIdProto containerId) {
synchronized(workerResourceLock) {
- WorkerResource managedWorkerResource = allWorkerResourceMap.get(workerResource.getId());
- if(managedWorkerResource != null) {
- managedWorkerResource.releaseResource(workerResource);
+ AllocatedWorkerResource allocatedWorkerResource = allocatedResourceMap.get(containerId);
+ if(allocatedWorkerResource != null) {
+ LOG.info("Release Resource:" + ebId + "," +
+ allocatedWorkerResource.allocatedDiskSlots + "," + allocatedWorkerResource.allocatedMemoryMB);
+ allocatedWorkerResource.workerResource.releaseResource(
+ allocatedWorkerResource.allocatedDiskSlots, allocatedWorkerResource.allocatedMemoryMB);
+ } else {
+ LOG.warn("No AllocatedWorkerResource data for [" + ebId + "," + containerId + "]");
+ return;
}
}
@@ -411,7 +629,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
return;
} else {
queryMasterWorkerResource = queryMasterMap.remove(queryId);
- queryMasterWorkerResource.releaseQueryMasterTask();
+ queryMasterWorkerResource.releaseQueryMasterTask(queryMasterDefaultDiskSlot, queryMasterDefaultMemoryMB);
}
}
@@ -462,7 +680,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResource.setLastHeartbeat(System.currentTimeMillis());
workerResource.setWorkerStatus(WorkerStatus.LIVE);
if(request.getServerStatus() != null) {
- workerResource.setMemoryMBSlots(request.getServerStatus().getSystem().getTotalMemoryMB());
+ workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());
workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
@@ -470,7 +688,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
} else {
- workerResource.setMemoryMBSlots(4096);
+ workerResource.setMemoryMB(4096);
workerResource.setDiskSlots(4);
workerResource.setCpuCoreSlots(4);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index b702063..e8c9a9e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -35,12 +35,12 @@ public class WorkerResource implements Comparable<WorkerResource> {
private int pullServerPort;
private int httpPort;
- private int diskSlots;
+ private float diskSlots;
private int cpuCoreSlots;
- private int memoryMBSlots;
+ private int memoryMB;
- private int usedDiskSlots;
- private int usedMemoryMBSlots;
+ private float usedDiskSlots;
+ private int usedMemoryMB;
private int usedCpuCoreSlots;
private long maxHeap;
@@ -75,28 +75,11 @@ public class WorkerResource implements Comparable<WorkerResource> {
this.allocatedHost = allocatedHost;
}
- public void addUsedDiskSlots(int diskSlots) {
- usedDiskSlots += diskSlots;
- }
-
- public void addUsedMemoryMBSlots(int memoryMBSlots) {
- try {
- wlock.lock();
- usedMemoryMBSlots += memoryMBSlots;
- } finally {
- wlock.unlock();
- }
- }
-
- public void addUsedCpuCoreSlots(int cpuCoreSlots) {
- usedCpuCoreSlots += cpuCoreSlots;
- }
-
- public int getDiskSlots() {
+ public float getDiskSlots() {
return diskSlots;
}
- public void setDiskSlots(int diskSlots) {
+ public void setDiskSlots(float diskSlots) {
this.diskSlots = diskSlots;
}
@@ -108,36 +91,40 @@ public class WorkerResource implements Comparable<WorkerResource> {
this.cpuCoreSlots = cpuCoreSlots;
}
- public int getMemoryMBSlots() {
+ public int getMemoryMB() {
try {
rlock.lock();
- return memoryMBSlots;
+ return memoryMB;
} finally {
rlock.unlock();
}
}
- public void setMemoryMBSlots(int memoryMBSlots) {
+ public void setMemoryMB(int memoryMB) {
try {
wlock.lock();
- this.memoryMBSlots = memoryMBSlots;
+ this.memoryMB = memoryMB;
} finally {
wlock.unlock();
}
}
- public int getAvailableDiskSlots() {
+ public float getAvailableDiskSlots() {
return diskSlots - usedDiskSlots;
}
- public int getAvailableMemoryMBSlots() {
- return getMemoryMBSlots() - getUsedMemoryMBSlots();
+ public int getAvailableMemoryMB() {
+ return memoryMB - usedMemoryMB;
+ }
+
+ public int getAvailableCpuCoreSlots() {
+ return cpuCoreSlots - usedCpuCoreSlots;
}
@Override
public String toString() {
- return "host:" + allocatedHost + ", port=" + portsToStr() + ", slots=" + memoryMBSlots + ":" + cpuCoreSlots + ":" + diskSlots +
- ", used=" + getUsedMemoryMBSlots() + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
+ return "host:" + allocatedHost + ", port=" + portsToStr() + ", slots=m:" + memoryMB + ",d:" + diskSlots +
+ ",c:" + cpuCoreSlots + ", used=m:" + usedMemoryMB + ",d:" + usedDiskSlots + ",c:" + usedCpuCoreSlots;
}
public String portsToStr() {
@@ -148,23 +135,22 @@ public class WorkerResource implements Comparable<WorkerResource> {
this.lastHeartbeat = heartbeatTime;
}
- public int getUsedMemoryMBSlots() {
+ public int getUsedMemoryMB() {
try {
rlock.lock();
- return usedMemoryMBSlots;
+ return usedMemoryMB;
} finally {
rlock.unlock();
}
}
- public void setUsedMemoryMBSlots(int usedMemoryMBSlots) {
+ public void setUsedMemoryMB(int usedMemoryMB) {
try {
wlock.lock();
- this.usedMemoryMBSlots = usedMemoryMBSlots;
+ this.usedMemoryMB = usedMemoryMB;
} finally {
wlock.unlock();
}
-
}
public int getUsedCpuCoreSlots() {
@@ -175,7 +161,7 @@ public class WorkerResource implements Comparable<WorkerResource> {
this.usedCpuCoreSlots = usedCpuCoreSlots;
}
- public int getUsedDiskSlots() {
+ public float getUsedDiskSlots() {
return usedDiskSlots;
}
@@ -211,33 +197,40 @@ public class WorkerResource implements Comparable<WorkerResource> {
this.taskRunnerMode = taskRunnerMode;
}
- public void releaseResource(WorkerResource workerResource) {
+ public void releaseResource(float diskSlots, int memoryMB) {
try {
wlock.lock();
- usedMemoryMBSlots = usedMemoryMBSlots - workerResource.getMemoryMBSlots();
+ usedMemoryMB = usedMemoryMB - memoryMB;
+ usedDiskSlots -= diskSlots;
+ if(usedMemoryMB < 0) {
+ LOG.warn("Used memory can't be a minus: " + usedMemoryMB);
+ usedMemoryMB = 0;
+ }
+ if(usedDiskSlots < 0) {
+ LOG.warn("Used disk slot can't be a minus: " + usedDiskSlots);
+ usedDiskSlots = 0;
+ }
} finally {
wlock.unlock();
}
-
- if(getUsedMemoryMBSlots() < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
- LOG.warn("Used resources can't be a minus.");
- LOG.warn(this + " ==> " + workerResource);
- }
}
- public int getSlots() {
- //TODO what is slot? 512MB = 1slot?
- return getMemoryMBSlots()/512;
- }
+ public void allocateResource(float diskSlots, int memoryMB) {
+ try {
+ wlock.lock();
+ usedMemoryMB += memoryMB;
+ usedDiskSlots += diskSlots;
- public int getAvaliableSlots() {
- //TODO what is slot? 512MB = 1slot?
- return getAvailableMemoryMBSlots()/512;
- }
+ if(usedMemoryMB > this.memoryMB) {
+ usedMemoryMB = this.memoryMB;
+ }
- public int getUsedSlots() {
- //TODO what is slot? 512MB = 1slot?
- return getUsedMemoryMBSlots()/512;
+ if(usedDiskSlots > this.diskSlots) {
+ usedDiskSlots = this.diskSlots;
+ }
+ } finally {
+ wlock.unlock();
+ }
}
public int getPeerRpcPort() {
@@ -316,16 +309,14 @@ public class WorkerResource implements Comparable<WorkerResource> {
return numQueryMasterTasks.get();
}
- public void setNumQueryMasterTasks(int numQueryMasterTasks) {
- this.numQueryMasterTasks.set(numQueryMasterTasks);
- }
-
- public void addNumQueryMasterTask() {
+ public void addNumQueryMasterTask(float diskSlots, int memoryMB) {
numQueryMasterTasks.getAndIncrement();
+ allocateResource(diskSlots, memoryMB);
}
- public void releaseQueryMasterTask() {
+ public void releaseQueryMasterTask(float diskSlots, int memoryMB) {
numQueryMasterTasks.getAndDecrement();
+ releaseResource(diskSlots, memoryMB);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 2e66f98..1ce2c9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -20,6 +20,8 @@ package org.apache.tajo.master.rm;
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.proto.YarnProtos.*;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -56,13 +58,13 @@ public interface WorkerResourceManager {
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request);
- public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource);
+ public void releaseWorkerResource(ExecutionBlockId ebId, ContainerIdProto containerId);
public Map<String, WorkerResource> getWorkers();
public void stop();
- public int getNumClusterSlots();
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
Collection<String> getQueryMasters();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 802e5ed..8b72cf9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -36,10 +36,13 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.YarnContainerProxy;
@@ -83,33 +86,38 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
return new ArrayList<String>();
}
- public int getNumClusterSlots() {
- return 0;
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+ .setNumWorkers(0)
+ .setTotalCpuCoreSlots(0)
+ .setTotalDiskSlots(0)
+ .setTotalMemoryMB(0)
+ .setTotalAvailableCpuCoreSlots(0)
+ .setTotalAvailableDiskSlots(0)
+ .setTotalAvailableMemoryMB(0)
+ .build();
}
@Override
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
- //nothing to do
- //yarn manages worker membership.
+ throw new UnimplementedException("workerHeartbeat");
}
@Override
- public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource) {
- //nothing to do
+ public void releaseWorkerResource(ExecutionBlockId ebId, YarnProtos.ContainerIdProto containerId) {
+ throw new UnimplementedException("releaseWorkerResource");
}
@Override
public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
- //nothing to do
- //allocateAndLaunchQueryMaster in startQueryMaster()
- return null;
+ throw new UnimplementedException("allocateQueryMaster");
}
@Override
public void allocateWorkerResources(
TajoMasterProtocol.WorkerResourceAllocationRequest request,
RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> rpcCallBack) {
- //nothing to do
+ throw new UnimplementedException("allocateWorkerResources");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
index 5fe1b74..cf68145 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
@@ -26,6 +26,9 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
public class ApplicationIdUtils {
+ public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId, int attemptId) {
+ return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), attemptId);
+ }
public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId) {
return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), 1);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
index f0c70cf..8b9219c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -24,5 +24,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
public interface ResourceAllocator {
public void allocateTaskWorker();
public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
- public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks);
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+ int numTasks, int memoryMBPerTask);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 275660a..ca6ea41 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -25,7 +25,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -93,8 +95,12 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
}
@Override
- public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
- int clusterSlots = workerContext.getNumClusterSlots();
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+ int numTasks,
+ int memoryMBPerTask) {
+ //TODO consider disk slot
+ TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+ int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB()/memoryMBPerTask;
return clusterSlots == 0 ? 1: Math.min(numTasks, clusterSlots);
}
@@ -226,13 +232,18 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
- int requiredMemoryMBSlot = 512; //TODO
- int requiredDiskSlots = 1; //TODO
+ //TODO consider task's resource usage pattern
+ int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
+ float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
+
TajoMasterProtocol.WorkerResourceAllocationRequest request =
TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
- .setMemoryMBSlots(requiredMemoryMBSlot)
- .setDiskSlots(requiredDiskSlots)
- .setNumWorks(event.getRequiredNum())
+ .setMinMemoryMBPerContainer(requiredMemoryMB)
+ .setMaxMemoryMBPerContainer(requiredMemoryMB)
+ .setNumContainers(event.getRequiredNum())
+ .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
+ .setMinDiskSlotPerContainer(requiredDiskSlots)
+ .setMaxDiskSlotPerContainer(requiredDiskSlots)
.setExecutionBlockId(event.getExecutionBlockId().getProto())
.build();
@@ -266,25 +277,26 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
continue;
}
}
- int numAllocatedWorkers = 0;
+ int numAllocatedContainers = 0;
if(response != null) {
- List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts = response.getWorkerAllocatedResourceList();
+ List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
List<Container> containers = new ArrayList<Container>();
- for(TajoMasterProtocol.WorkerAllocatedResource eachWorker: workerHosts) {
+ for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeIdPBImpl nodeId = new NodeIdPBImpl();
- nodeId.setHost(eachWorker.getWorkerHost());
- nodeId.setPort(eachWorker.getPeerRpcPort());
+ nodeId.setHost(eachAllocatedResource.getWorkerHost());
+ nodeId.setPort(eachAllocatedResource.getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
containerId.setApplicationAttemptId(
- ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId()));
- containerId.setId(containerIdSeq.incrementAndGet());
+ ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
+ eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
+ containerId.setId(eachAllocatedResource.getContainerId().getId());
container.setId(containerId);
container.setNodeId(nodeId);
@@ -292,10 +304,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
WorkerResource workerResource = new WorkerResource();
workerResource.setAllocatedHost(nodeId.getHost());
workerResource.setPeerRpcPort(nodeId.getPort());
- workerResource.setQueryMasterPort(eachWorker.getQueryMasterPort());
- workerResource.setPullServerPort(eachWorker.getWorkerPullServerPort());
- workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
- workerResource.setDiskSlots(requiredDiskSlots);
+ workerResource.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
+ workerResource.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
+ workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
+ workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
container.setWorkerResource(workerResource);
@@ -304,32 +316,32 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
if (!SubQuery.isRunningState(state)) {
- List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
- for(Container eachContainer: containers) {
- workerResources.add(((TajoWorkerContainer)eachContainer).getWorkerResource());
- }
try {
- TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, workerResources);
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ for(Container eachContainer: containers) {
+ containerIds.add(eachContainer.getId());
+ }
+ TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return;
}
- if (workerHosts.size() > 0) {
+ if (allocatedResources.size() > 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
}
queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
}
- numAllocatedWorkers += workerHosts.size();
+ numAllocatedContainers += allocatedResources.size();
}
- if(event.getRequiredNum() > numAllocatedWorkers) {
+ if(event.getRequiredNum() > numAllocatedContainers) {
ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
event.getType(), event.getExecutionBlockId(), event.getPriority(),
event.getResource(),
- event.getRequiredNum() - numAllocatedWorkers,
+ event.getRequiredNum() - numAllocatedContainers,
event.isLeafQuery(), event.getProgress()
);
queryTaskContext.getEventHandler().handle(shortRequestEvent);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 4d46a45..57d99c4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -41,6 +41,9 @@ import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.v2.DiskDeviceInfo;
+import org.apache.tajo.storage.v2.DiskMountInfo;
+import org.apache.tajo.storage.v2.DiskUtil;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
@@ -103,7 +106,7 @@ public class TajoWorker extends CompositeService {
private AtomicInteger numClusterNodes = new AtomicInteger();
- private AtomicInteger numClusterSlots = new AtomicInteger();
+ private TajoMasterProtocol.ClusterResourceSummary clusterResource;
private int httpPort;
@@ -350,12 +353,16 @@ public class TajoWorker extends CompositeService {
return TajoWorker.this.numClusterNodes.get();
}
- public void setNumClusterSlots(int numClusterSlots) {
- TajoWorker.this.numClusterSlots.set(numClusterSlots);
+ public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+ synchronized(numClusterNodes) {
+ TajoWorker.this.clusterResource = clusterResource;
+ }
}
- public int getNumClusterSlots() {
- return TajoWorker.this.numClusterSlots.get();
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+ synchronized(numClusterNodes) {
+ return TajoWorker.this.clusterResource;
+ }
}
public InetSocketAddress getTajoMasterAddress() {
@@ -391,17 +398,18 @@ public class TajoWorker extends CompositeService {
TajoMasterProtocol.ServerStatusProto.System systemInfo;
List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
- int workerDisksNum;
- List<File> mountPaths;
+ float workerDiskSlots;
+ int workerMemoryMB;
+ List<DiskDeviceInfo> diskDeviceInfos;
public WorkerHeartbeatThread() {
- int workerMemoryMB;
int workerCpuCoreNum;
boolean dedicatedResource = systemConf.getBoolVar(ConfVars.WORKER_RESOURCE_DEDICATED);
-
+ int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+
try {
- mountPaths = getMountPath();
+ diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
@@ -411,24 +419,23 @@ public class TajoWorker extends CompositeService {
int totalMemory = getTotalMemoryMB();
workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
- if(mountPaths == null) {
- workerDisksNum = ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
+
+ if(diskDeviceInfos == null) {
+ workerDiskSlots = ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
} else {
- workerDisksNum = mountPaths.size();
+ workerDiskSlots = diskDeviceInfos.size();
}
} else {
- // TODO - it's a hack and it must be fixed
- //workerMemoryMB = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
- workerMemoryMB = 512 * systemConf.getIntVar(ConfVars.WORKER_EXECUTION_MAX_SLOTS);
- workerDisksNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+ workerMemoryMB = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
workerCpuCoreNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+ workerDiskSlots = systemConf.getFloatVar(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
}
systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
.setAvailableProcessors(workerCpuCoreNum)
.setFreeMemoryMB(0)
.setMaxMemoryMB(0)
- .setTotalMemoryMB(workerMemoryMB)
+ .setTotalMemoryMB(getTotalMemoryMB())
.build();
}
@@ -472,16 +479,8 @@ public class TajoWorker extends CompositeService {
}
while(true) {
- if(sendDiskInfoCount == 0 && mountPaths != null) {
- for(File eachFile: mountPaths) {
- diskInfos.clear();
- diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
- .setAbsolutePath(eachFile.getAbsolutePath())
- .setTotalSpace(eachFile.getTotalSpace())
- .setFreeSpace(eachFile.getFreeSpace())
- .setUsableSpace(eachFile.getUsableSpace())
- .build());
- }
+ if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
+ getDiskUsageInfos();
}
TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
@@ -494,7 +493,8 @@ public class TajoWorker extends CompositeService {
.addAllDisk(diskInfos)
.setRunningTaskNum(taskRunnerManager == null ? 1 : taskRunnerManager.getNumTasks()) //TODO
.setSystem(systemInfo)
- .setDiskSlots(workerDisksNum)
+ .setDiskSlots(workerDiskSlots)
+ .setMemoryResourceMB(workerMemoryMB)
.setJvmHeap(jvmHeap)
.setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(queryMasterMode))
.setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(taskRunnerMode))
@@ -521,13 +521,11 @@ public class TajoWorker extends CompositeService {
TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
if(response != null) {
- if(response.getNumClusterNodes() > 0) {
- workerContext.setNumClusterNodes(response.getNumClusterNodes());
- }
-
- if(response.getNumClusterSlots() > 0) {
- workerContext.setNumClusterSlots(response.getNumClusterSlots());
+ TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+ if(clusterResourceSummary.getNumWorkers() > 0) {
+ workerContext.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
}
+ workerContext.setClusterResource(clusterResourceSummary);
} else {
if(callBack.getController().failed()) {
throw new ServiceException(callBack.getController().errorText());
@@ -557,6 +555,24 @@ public class TajoWorker extends CompositeService {
LOG.info("Worker Resource Heartbeat Thread stopped.");
}
+
+ private void getDiskUsageInfos() {
+ diskInfos.clear();
+ for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
+ List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
+ if(mountInfos != null) {
+ for(DiskMountInfo eachMount: mountInfos) {
+ File eachFile = new File(eachMount.getMountPath());
+ diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+ .setAbsolutePath(eachFile.getAbsolutePath())
+ .setTotalSpace(eachFile.getTotalSpace())
+ .setFreeSpace(eachFile.getFreeSpace())
+ .setUsableSpace(eachFile.getUsableSpace())
+ .build());
+ }
+ }
+ }
+ }
}
private class ShutdownHook implements Runnable {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index f931615..70a998b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -18,7 +18,6 @@
package org.apache.tajo.worker;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -39,7 +38,10 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.QueryUnitRequest;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index d694b7c..e74a09d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -64,7 +64,9 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
}
@Override
- public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+ int numTasks,
+ int memoryMBPerTask) {
int numClusterNodes = workerContext.getNumClusterNodes();
TajoConf conf = (TajoConf)workerContext.getQueryMaster().getConfig();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index f9b15a7..dca200e 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -50,12 +50,13 @@ message ServerStatusProto {
}
required System system = 1;
- required int32 diskSlots = 2;
- repeated Disk disk = 3;
- required int32 runningTaskNum = 4;
- required JvmHeap jvmHeap = 5;
- required BoolProto queryMasterMode = 6;
- required BoolProto taskRunnerMode = 7;
+ required float diskSlots = 2;
+ required int32 memoryResourceMB = 3;
+ repeated Disk disk = 4;
+ required int32 runningTaskNum = 5;
+ required JvmHeap jvmHeap = 6;
+ required BoolProto queryMasterMode = 7;
+ required BoolProto taskRunnerMode = 8;
}
message TajoHeartbeat {
@@ -79,16 +80,37 @@ message TajoHeartbeatResponse {
repeated string params = 2;
}
required BoolProto heartbeatResult = 1;
- required int32 numClusterNodes = 2;
- required int32 numClusterSlots = 3;
- optional ResponseCommand responseCommand = 4;
+ required ClusterResourceSummary clusterResourceSummary = 2;
+ optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+ required int32 numWorkers = 1;
+ required int32 totalDiskSlots = 2;
+ required int32 totalCpuCoreSlots = 3;
+ required int32 totalMemoryMB = 4;
+
+ required int32 totalAvailableDiskSlots = 5;
+ required int32 totalAvailableCpuCoreSlots = 6;
+ required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+ MEMORY = 1;
+ DISK = 2;
}
message WorkerResourceAllocationRequest {
required ExecutionBlockIdProto executionBlockId = 1;
- required int32 numWorks = 2;
- required int32 memoryMBSlots = 3 ;
- required int32 diskSlots = 4;
+ required ResourceRequestPriority resourceRequestPriority = 2;
+
+ required int32 numContainers = 3;
+
+ required int32 maxMemoryMBPerContainer = 4;
+ required int32 minMemoryMBPerContainer = 5;
+
+ required float maxDiskSlotPerContainer = 6;
+ required float minDiskSlotPerContainer = 7;
}
message WorkerResourceProto {
@@ -96,19 +118,25 @@ message WorkerResourceProto {
required int32 peerRpcPort = 2;
required int32 queryMasterPort = 3;
required ExecutionBlockIdProto executionBlockId = 4;
- required int32 memoryMBSlots = 5 ;
+ required int32 memoryMB = 5 ;
required int32 diskSlots = 6;
}
message WorkerResourceReleaseRequest {
- repeated WorkerResourceProto workerResources = 1;
+ required ExecutionBlockIdProto executionBlockId = 1;
+ repeated ContainerIdProto containerIds = 2;
}
message WorkerAllocatedResource {
- required string workerHost = 1;
- required int32 peerRpcPort = 2;
- required int32 queryMasterPort = 3;
- required int32 workerPullServerPort = 4;
+ required ContainerIdProto containerId = 1;
+ required string nodeId = 2;
+ required string workerHost = 3;
+ required int32 peerRpcPort = 4;
+ required int32 queryMasterPort = 5;
+ required int32 workerPullServerPort = 6;
+
+ required int32 allocatedMemoryMB = 7;
+ required float allocatedDiskSlots = 8;
}
message WorkerResourceAllocationResponse {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
index d0bf887..0600145 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
@@ -31,10 +31,6 @@
List<String> wokerKeys = new ArrayList<String>(workers.keySet());
Collections.sort(wokerKeys);
- int totalSlot = 0;
- int runningSlot = 0;
- int idleSlot = 0;
-
int runningQueryMasterTasks = 0;
Set<WorkerResource> liveWorkers = new TreeSet<WorkerResource>();
@@ -58,9 +54,6 @@
if(eachWorker.isTaskRunnerMode()) {
if(eachWorker.getWorkerStatus() == WorkerStatus.LIVE) {
liveWorkers.add(eachWorker);
- idleSlot += eachWorker.getAvaliableSlots();
- totalSlot += eachWorker.getSlots();
- runningSlot += eachWorker.getUsedSlots();
} else if(eachWorker.getWorkerStatus() == WorkerStatus.DEAD) {
deadWorkers.add(eachWorker);
} else if(eachWorker.getWorkerStatus() == WorkerStatus.DECOMMISSION) {
@@ -94,7 +87,7 @@
} else {
%>
<table width="100%" class="border_table" border="1">
- <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running QueryMaster Tasks</th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
+ <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running Query</th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
<%
int no = 1;
@@ -106,7 +99,7 @@
<td><a href='<%=queryMasterHttp%>'><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></a></td>
<td width='100' align='center'><%=queryMaster.getClientPort()%></td>
<td width='200' align='right'><%=queryMaster.getNumQueryMasterTasks()%></td>
- <td width='200' align='left'><%=queryMaster.getFreeHeap()/1024/1024%>/<%=queryMaster.getMaxHeap()/1024/1024%> MB</td>
+ <td width='200' align='center'><%=queryMaster.getFreeHeap()/1024/1024%>/<%=queryMaster.getMaxHeap()/1024/1024%> MB</td>
<td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeat(), System.currentTimeMillis())%></td>
<td width='100' align='center'><%=queryMaster.getWorkerStatus()%></td>
</tr>
@@ -148,7 +141,7 @@
<hr/>
<h2>Worker</h2>
- <div>Live:<%=liveWorkers.size()%>, Dead: <%=deadWorkersHtml%>, Slot(running/idle/total): <%=runningSlot%> / <%=idleSlot%> / <%=totalSlot%></div>
+ <div>Live:<%=liveWorkers.size()%>, Dead: <%=deadWorkersHtml%></div>
<hr/>
<h3>Live Workers</h3>
<%
@@ -157,7 +150,7 @@
} else {
%>
<table width="100%" class="border_table" border="1">
- <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Heartbeat</th><th>Status</th></tr>
+ <tr><th>No</th><th>Worker</th><th>PullServer<br/>Port</th><th>Running Tasks</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
<%
int no = 1;
for(WorkerResource worker: liveWorkers) {
@@ -166,12 +159,11 @@
<tr>
<td width='30' align='right'><%=no++%></td>
<td><a href='<%=workerHttp%>'><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></a></td>
- <td width='150' align='center'><%=worker.getPullServerPort()%></td>
+ <td width='80' align='center'><%=worker.getPullServerPort()%></td>
<td width='100' align='right'><%=worker.getNumRunningTasks()%></td>
- <td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
- <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
- <td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
- <td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+ <td width='150' align='center'><%=worker.getUsedMemoryMB()%>/<%=worker.getMemoryMB()%></td>
+ <td width='100' align='center'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
+ <td width='100' align='center'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
<td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeat(), System.currentTimeMillis())%></td>
<td width='100' align='center'><%=worker.getWorkerStatus()%></td>
</tr>
@@ -196,7 +188,7 @@
} else {
%>
<table width="100%" class="border_table" border="1">
- <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+ <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Memory Resource</th><th>Disk Resource</th></th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
<%
int no = 1;
for(WorkerResource worker: deadWorkers) {
@@ -205,10 +197,9 @@
<td width='30' align='right'><%=no++%></td>
<td><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></td>
<td width='150' align='center'><%=worker.getPullServerPort()%></td>
- <td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
- <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
+ <td width='100' align='right'><%=worker.getUsedMemoryMB()%>/<%=worker.getMemoryMB()%></td>
<td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
- <td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+ <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
<td width='100' align='center'><%=worker.getWorkerStatus()%></td>
</tr>
<%
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index fd5fa47..f652ea5 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -29,6 +29,7 @@
<%@ page import="org.apache.hadoop.util.StringUtils" %>
<%@ page import="org.apache.hadoop.fs.FileSystem" %>
<%@ page import="org.apache.tajo.conf.TajoConf" %>
+<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -44,9 +45,9 @@
int numDeadQueryMasters = 0;
int runningQueryMasterTask = 0;
- int totalSlot = 0;
- int runningSlot = 0;
- int idleSlot = 0;
+
+ TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary =
+ master.getContext().getResourceManager().getClusterResourceSummary();
for(WorkerResource eachWorker: workers.values()) {
if(eachWorker.getWorkerStatus() == WorkerStatus.LIVE) {
@@ -58,9 +59,6 @@
if(eachWorker.isTaskRunnerMode()) {
numWorkers++;
numLiveWorkers++;
- idleSlot += eachWorker.getAvaliableSlots();
- totalSlot += eachWorker.getSlots();
- runningSlot += eachWorker.getUsedSlots();
}
} else if(eachWorker.getWorkerStatus() == WorkerStatus.DEAD) {
if(eachWorker.isQueryMasterMode()) {
@@ -143,24 +141,24 @@
<h3>Cluster Summary</h3>
<table width="100%" class="border_table" border="1">
- <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Total Slots</th><th>Running Slots</th><th>Idle Slots</th></tr>
+ <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Running Master</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></tr>
<tr>
<td><a href='cluster.jsp'>Query Master</a></td>
<td align='right'><%=numQueryMasters%></td>
<td align='right'><%=numLiveQueryMasters%></td>
<td align='right'><%=numDeadQueryMastersHtml%></td>
- <td align='right'>-</td>
<td align='right'><%=runningQueryMasterTask%></td>
- <td align='right'>-</td>
+ <td align='center'>-</td>
+ <td align='center'>-</td>
</tr>
<tr>
<td><a href='cluster.jsp'>Worker</a></td>
<td align='right'><%=numWorkers%></td>
<td align='right'><%=numLiveWorkers%></td>
<td align='right'><%=numDeadWorkersHtml%></td>
- <td align='right'><%=totalSlot%></td>
- <td align='right'><%=runningSlot%></td>
- <td align='right'><%=idleSlot%></td>
+ <td align='right'>-</td>
+ <td align='center'><%=clusterResourceSummary.getTotalMemoryMB() - clusterResourceSummary.getTotalAvailableMemoryMB()%>/<%=clusterResourceSummary.getTotalMemoryMB()%></td>
+ <td align='center'><%=clusterResourceSummary.getTotalDiskSlots() - clusterResourceSummary.getTotalAvailableDiskSlots()%>/<%=clusterResourceSummary.getTotalDiskSlots()%></td>
</tr>
</table>
<p/>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 9ea07de..b7c8512 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -96,6 +96,9 @@ public class TajoTestingCluster {
);
conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
}
+ conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
+ conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);
+
this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
.indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");