You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/29 04:35:49 UTC
git commit: TAJO-137: Unreleased resources and wrong allocation
requests in TajoWorkerResourceManager. (hyoungjunkim via hyunsik)
Updated Branches:
refs/heads/master 4e47471c3 -> ce84eba43
TAJO-137: Unreleased resources and wrong allocation requests in TajoWorkerResourceManager. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/ce84eba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/ce84eba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/ce84eba4
Branch: refs/heads/master
Commit: ce84eba436aaf639920bb38ff8d5081d01c23686
Parents: 4e47471
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Aug 29 11:20:47 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Aug 29 11:20:47 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
pom.xml | 1 +
tajo-core/tajo-core-backend/pom.xml | 38 +++-
.../apache/tajo/master/TajoAsyncDispatcher.java | 4 +-
.../apache/tajo/master/TajoContainerProxy.java | 7 +-
.../apache/tajo/master/TajoMasterService.java | 18 +-
.../apache/tajo/master/TaskSchedulerImpl.java | 3 +-
.../master/querymaster/QueryInProgress.java | 32 +--
.../tajo/master/querymaster/QueryInfo.java | 4 +-
.../master/querymaster/QueryJobManager.java | 8 +-
.../tajo/master/querymaster/SubQuery.java | 8 +-
.../master/rm/TajoWorkerResourceManager.java | 120 ++++++++---
.../apache/tajo/master/rm/WorkerResource.java | 83 +++++---
.../tajo/master/rm/WorkerResourceManager.java | 5 +
.../master/rm/YarnRMContainerAllocator.java | 4 +-
.../tajo/master/rm/YarnTajoResourceManager.java | 8 +
.../tajo/worker/AbstractResourceAllocator.java | 25 ---
.../apache/tajo/worker/ResourceAllocator.java | 1 +
.../tajo/worker/TajoResourceAllocator.java | 16 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 83 ++++++--
.../java/org/apache/tajo/worker/TaskRunner.java | 12 +-
.../apache/tajo/worker/TaskRunnerManager.java | 12 +-
.../tajo/worker/YarnResourceAllocator.java | 7 +-
.../src/main/proto/TajoMasterProtocol.proto | 12 +-
.../src/main/resources/webapps/admin/index.jsp | 210 +++++--------------
25 files changed, 382 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 787c0b5..ac1c60a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -112,6 +112,9 @@ Release 0.2.0 - unreleased
BUG FIXES
+ TAJO-137: Unreleased resources and wrong allocation requests in
+ TajoWorkerResourceManager. (hyoungjunkim via hyunsik)
+
TAJO-130: Same queryConf file conflicts. (jinho)
TAJO-82: NullPointerException occurs when Schema is converted as an array
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 07b05a1..661708e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -310,6 +310,7 @@
<exclude>**/*.sql</exclude>
<exclude>**/*.schema</exclude>
<exclude>**/*.tbl</exclude>
+ <exclude>**/*.jsp</exclude>
<!-- generated content -->
<exclude>**/target/**</exclude>
<exclude>**/*.log</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index 6c3351f..d1d9f9f 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -18,7 +18,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>tajo-project</artifactId>
@@ -41,7 +41,7 @@
<repository>
<id>repository.jboss.org</id>
<url>https://repository.jboss.org/nexus/content/repositories/releases/
- </url>
+ </url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -340,6 +340,40 @@
<artifactId>dspace-geoip</artifactId>
<version>1.2.3</version>
</dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ <version>3.1.1</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5-20081211</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ <version>6.1.14</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>6.1.14</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ <version>6.1.14</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ <version>6.1.14</version>
+ </dependency>
+
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
index 8f83557..33a1e53 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
@@ -72,7 +72,7 @@ public class TajoAsyncDispatcher extends AbstractService implements Dispatcher
}
} catch(InterruptedException ie) {
if (!stopped) {
- LOG.warn("AsyncDispatcher thread interrupted", ie);
+ LOG.warn("AsyncDispatcher thread interrupted");
}
return;
}
@@ -112,7 +112,7 @@ public class TajoAsyncDispatcher extends AbstractService implements Dispatcher
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
- LOG.warn("Interrupted Exception while stopping", ie);
+ LOG.warn("Interrupted Exception while stopping");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 5359311..966df72 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -47,10 +47,15 @@ public class TajoContainerProxy extends ContainerProxy {
@Override
public void launch(ContainerLaunchContext containerLaunchContext) {
context.getResourceAllocator().addContainer(containerID, this);
+
this.hostName = container.getNodeId().getHost();
- this.port = context.getQueryMasterContext().getWorkerContext().getPullService().getPort();
+ //this.port = context.getQueryMasterContext().getWorkerContext().getPullService().getPort();
+ this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
this.state = ContainerState.RUNNING;
+ LOG.info("=======>Launch Container:" + executionBlockId + "," + containerID.getId() + "," +
+ container.getId() + "," + container.getNodeId() + ", pullServer=" + port);
+
assignExecutionBlock(executionBlockId, container);
context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index e522913..8d1bbe0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -117,6 +117,9 @@ public class TajoMasterService extends AbstractService {
if(command != null) {
builder.setResponseCommand(command);
}
+
+ builder.setNumClusterNodes(context.getResourceManager().getWorkers().size());
+ builder.setNumClusterSlots(context.getResourceManager().getNumClusterSlots());
done.run(builder.build());
}
@@ -126,17 +129,6 @@ public class TajoMasterService extends AbstractService {
TajoMasterProtocol.WorkerResourceAllocationRequest request,
RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
context.getResourceManager().allocateWorkerResources(request, done);
-
-// List<String> workerHosts = new ArrayList<String>();
-// for(WorkerResource eachWorker: workerResources) {
-// workerHosts.add(eachWorker.getAllocatedHost() + ":" + eachWorker.getPorts()[0]);
-// }
-//
-// done.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
-// .setExecutionBlockId(request.getExecutionBlockId())
-// .addAllAllocatedWorks(workerHosts)
-// .build()
-// );
}
@Override
@@ -148,11 +140,11 @@ public class TajoMasterService extends AbstractService {
WorkerResource workerResource = new WorkerResource();
String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
workerResource.setAllocatedHost(tokens[0]);
- workerResource.setPorts(new int[]{Integer.parseInt(tokens[1])});
+ workerResource.setManagerPort(Integer.parseInt(tokens[1]));
workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
- LOG.info("====> releaseWorkerResource:" + workerResource);
+ LOG.info("releaseWorkerResource:" + workerResource);
context.getResourceManager().releaseWorkerResource(
new QueryId(eachWorkerResource.getExecutionBlockId().getQueryId()),
workerResource);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 651f9c0..17b6168 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -396,7 +396,8 @@ public class TaskSchedulerImpl extends AbstractService
TaskRequestEvent taskRequest;
while (it.hasNext() && leafTasks.size() > 0) {
taskRequest = it.next();
- LOG.info("====> assignToLeafTasks: " + taskRequest.getExecutionBlockId());
+ LOG.info("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+ "containerId=" + taskRequest.getContainerId());
ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
String host = container.getTaskHostName();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2e2870f..10be0a3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -23,13 +23,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.master.rm.WorkerResourceManager;
@@ -45,7 +45,7 @@ public class QueryInProgress extends CompositeService {
private QueryId queryId;
- private AsyncDispatcher dispatcher;
+ private TajoAsyncDispatcher dispatcher;
private LogicalRootNode plan;
@@ -75,7 +75,7 @@ public class QueryInProgress extends CompositeService {
@Override
public void init(Configuration conf) {
- dispatcher = new AsyncDispatcher();
+ dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId);
this.addService(dispatcher);
dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
@@ -256,30 +256,4 @@ public class QueryInProgress extends CompositeService {
state == TajoProtos.QueryState.QUERY_KILLED ||
state == TajoProtos.QueryState.QUERY_SUCCEEDED;
}
-
-// private void checkQueryMasterShutdown() {
-// //run background
-// Thread t = new Thread() {
-// public void run() {
-// while(true) {
-// try {
-// if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
-// queryMasterStopped.set(true);
-// LOG.info("==========> " + queryId + " QueryMaster stopped");
-// break;
-// }
-// } catch (Exception e) {
-// LOG.error(e.getMessage(), e);
-// }
-// try {
-// Thread.sleep(1000);
-// } catch (InterruptedException e) {
-// break;
-// }
-// }
-// }
-// };
-//
-// t.start();
-// }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index e7ceae7..89fc6fe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -66,14 +66,14 @@ public class QueryInfo {
if(queryMasterResource == null) {
return 0;
}
- return queryMasterResource.getPorts()[0];
+ return queryMasterResource.getManagerPort();
}
public int getQueryMasterClientPort() {
if(queryMasterResource == null) {
return 0;
}
- return queryMasterResource.getPorts()[1];
+ return queryMasterResource.getClientPort();
}
public TajoProtos.QueryState getQueryState() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index e4d83af..d5a4021 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -42,9 +42,9 @@ public class QueryJobManager extends CompositeService {
private AsyncDispatcher dispatcher;
- private Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
+ private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
- private Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
+ private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
public QueryJobManager(final TajoMaster.MasterContext masterContext) {
super(QueryJobManager.class.getName());
@@ -160,7 +160,9 @@ public class QueryJobManager extends CompositeService {
if(queryHeartbeat.getTajoWorkerHost() != null) {
WorkerResource queryMasterResource = new WorkerResource();
queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
- queryMasterResource.setPorts(new int[]{queryHeartbeat.getTajoWorkerPort(), queryHeartbeat.getTajoWorkerClientPort()});
+ queryMasterResource.setManagerPort(queryHeartbeat.getTajoWorkerPort());
+ queryMasterResource.setClientPort(queryHeartbeat.getTajoWorkerClientPort());
+ queryMasterResource.setPullServerPort(queryHeartbeat.getTajoWorkerPullServerPort());
queryInfo.setQueryMasterResource(queryMasterResource);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index fc60df7..ce4c846 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -620,14 +620,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit [] tasks = subQuery.getQueryUnits();
- //TODO refresh worker's numClusterNodes
- int numClusterNodes = subQuery.getContext().getResourceAllocator().getNumClusterNode();
- int numRequest = numClusterNodes == 0 ? tasks.length: Math.min(tasks.length, numClusterNodes * 4);
+ int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
+ subQuery.getContext().getQueryMasterContext().getWorkerContext(), tasks.length
+ );
final Resource resource = Records.newRecord(Resource.class);
resource.setMemory(512);
+ LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
+
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(subQuery.getPriority());
ContainerAllocationEvent event =
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index be4b800..93450e1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -47,7 +47,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private Map<QueryId, WorkerResource> queryMasterMap = new HashMap<QueryId, WorkerResource>();
- private Object workerResourceLock = new Object();
+ private final Object workerResourceLock = new Object();
private final String queryIdSeed;
@@ -63,10 +63,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private int queryMasterMemoryMB;
+ private int queryMasterDiskSlot;
+
public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
this.masterContext = masterContext;
this.queryIdSeed = String.valueOf(System.currentTimeMillis());
this.queryMasterMemoryMB = masterContext.getConf().getInt("tajo.querymaster.memoryMB", 512);
+ this.queryMasterDiskSlot = masterContext.getConf().getInt("tajo.querymaster.diskSlot", 1);
requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
reAllocationList = new ArrayList<WorkerResourceRequest>();
@@ -78,6 +81,21 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
reAllocator.start();
}
+ public Map<String, WorkerResource> getWorkers() {
+ return Collections.unmodifiableMap(allWorkerResourceMap);
+ }
+
+ public int getNumClusterSlots() {
+ int numSlots = 0;
+ synchronized(workerResourceLock) {
+ for(String eachWorker: liveWorkerResources) {
+ numSlots += allWorkerResourceMap.get(eachWorker).getSlots();
+ }
+ }
+
+ return numSlots;
+ }
+
@Override
public void stop() {
if(stopped.get()) {
@@ -95,7 +113,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
@Override
public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
- List<WorkerResource> workerResources = chooseWorkers(true, 1, 1, 1);
+ List<WorkerResource> workerResources = chooseWorkers(true, queryMasterMemoryMB, queryMasterDiskSlot, 1);
if(workerResources.size() == 0) {
//TODO if resource available, assign worker.
LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
@@ -188,16 +206,20 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
while(!stopped.get()) {
try {
WorkerResourceRequest resourceRequest = requestQueue.take();
- List<WorkerResource> workerResources = chooseWorkers(false,
- resourceRequest.request.getMemoryMBSlots(),
- resourceRequest.request.getDiskSlots(),
- resourceRequest.request.getNumWorks());
LOG.info("====> allocateWorkerResources:" +
(new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
- ", required:" + resourceRequest.request.getNumWorks() + ", allocated:" + workerResources.size() +
+ ", required:" + resourceRequest.request.getNumWorks() +
", queryMasterRequest=" + resourceRequest.queryMasterRequest +
", liveWorkers=" + liveWorkerResources.size());
+
+ List<WorkerResource> workerResources = chooseWorkers(false,
+ resourceRequest.request.getMemoryMBSlots(),
+ resourceRequest.request.getDiskSlots(),
+ resourceRequest.request.getNumWorks());
+
+ LOG.info("====> allocateWorkerResources: allocated:" + workerResources.size());
+
// if(LOG.isDebugEnabled()) {
// LOG.debug("====> allocateWorkerResources:" +
// (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
@@ -211,14 +233,18 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
if(resourceRequest.queryMasterRequest) {
startQueryMaster(resourceRequest.queryId, workerResources.get(0));
} else {
- List<String> workerHosts = new ArrayList<String>();
+ List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts =
+ new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>();
for(WorkerResource eachWorker: workerResources) {
- workerHosts.add(eachWorker.getAllocatedHost() + ":" + eachWorker.getPorts()[0]);
+ workerHosts.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
+ .setWorkerHostAndPort(eachWorker.getAllocatedHost() + ":" + eachWorker.getManagerPort())
+ .setWorkerPullServerPort(eachWorker.getPullServerPort())
+ .build());
}
resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
.setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
- .addAllAllocatedWorks(workerHosts)
+ .addAllWorkerAllocatedResource(workerHosts)
.build()
);
}
@@ -282,27 +308,47 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
private List<WorkerResource> chooseWorkers(boolean queryMaster,
int requiredMemoryMBSlots, int requiredDiskSlots,
- int numWorkers) {
+ int numWorkerSlots) {
List<WorkerResource> selectedWorkers = new ArrayList<WorkerResource>();
int selectedCount = 0;
synchronized(workerResourceLock) {
- for(String eachWorker: liveWorkerResources) {
- if(selectedCount >= numWorkers) {
+ List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+ Collections.shuffle(randomWorkers);
+ int liveWorkerSize = randomWorkers.size();
+ Set<String> insufficientWorkers = new HashSet<String>();
+ boolean stop = false;
+ while(!stop) {
+ if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
break;
}
- WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
- if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots &&
- workerResource.getAvailableDiskSlots() >= requiredDiskSlots) {
- if(queryMaster && workerResource.isQueryMasterAllocated()) {
- continue;
+ for(String eachWorker: randomWorkers) {
+ if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
+ stop = true;
+ } else {
+ WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
+ if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots) {
+ //TODO check disk slot
+ // && workerResource.getAvailableDiskSlots() >= requiredDiskSlots) {
+ if(queryMaster && workerResource.isQueryMasterAllocated()) {
+ insufficientWorkers.add(eachWorker);
+ continue;
+ }
+ workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
+ //workerResource.addUsedDiskSlots(requiredDiskSlots);
+ workerResource.setQueryMasterAllocated(queryMaster);
+ selectedWorkers.add(workerResource);
+ selectedCount++;
+ } else {
+ insufficientWorkers.add(eachWorker);
+ }
+ }
+ }
+ if(!stop) {
+ for(String eachWorker: insufficientWorkers) {
+ randomWorkers.remove(eachWorker);
}
- workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
- workerResource.addUsedDiskSlots(requiredDiskSlots);
- workerResource.setQueryMasterAllocated(queryMaster);
- selectedWorkers.add(workerResource);
- selectedCount++;
}
}
}
@@ -310,10 +356,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
return selectedWorkers;
}
- public Collection<WorkerResource> getClusterWorkResources() {
- return Collections.unmodifiableCollection(allWorkerResourceMap.values());
- }
-
@Override
public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource) {
synchronized(workerResourceLock) {
@@ -341,14 +383,22 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
@Override
public void stopQueryMaster(QueryId queryId) {
- WorkerResource workerResource = null;
+ WorkerResource queryMasterWorkerResource = null;
synchronized(workerResourceLock) {
- workerResource = queryMasterMap.remove(queryId);
- }
- LOG.info("release QueryMaster resource:" + queryId + "," + workerResource.isQueryMasterAllocated());
- if(workerResource != null) {
- releaseWorkerResource(queryId, workerResource);
+ if(!queryMasterMap.containsKey(queryId)) {
+ LOG.warn("No QueryMaster resource info for " + queryId);
+ return;
+ } else {
+ queryMasterWorkerResource = queryMasterMap.remove(queryId);
+ }
}
+ LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
+ WorkerResource workerResource = new WorkerResource();
+ workerResource.copyId(queryMasterWorkerResource);
+ workerResource.setMemoryMBSlots(queryMasterMemoryMB);
+ workerResource.setDiskSlots(queryMasterDiskSlot);
+ workerResource.setCpuCoreSlots(0);
+ releaseWorkerResource(queryId, workerResource);
}
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
@@ -368,7 +418,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
int[] ports = new int[] { request.getTajoWorkerPort(), request.getTajoWorkerClientPort() };
- workerResource.setPorts(ports);
+ workerResource.setManagerPort(request.getTajoWorkerPort());
+ workerResource.setClientPort(request.getTajoWorkerClientPort());
+ workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
workerResource.setLastHeartbeat(System.currentTimeMillis());
workerResource.setWorkerStatus(WorkerStatus.LIVE);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index 00bd509..b21570c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -25,7 +25,9 @@ public class WorkerResource {
private static final Log LOG = LogFactory.getLog(WorkerResource.class);
private String allocatedHost;
- private int[] ports;
+ private int managerPort;
+ private int clientPort;
+ private int pullServerPort;
private int diskSlots;
private int cpuCoreSlots;
@@ -42,11 +44,12 @@ public class WorkerResource {
private long lastHeartbeat;
public String getId() {
- if(ports.length > 0) {
- return allocatedHost + ":" + ports[0];
- } else {
- return allocatedHost;
- }
+ return allocatedHost + ":" + managerPort;
+ }
+
+ public void copyId(WorkerResource workerResource) {
+ managerPort = workerResource.getManagerPort();
+ allocatedHost = workerResource.getAllocatedHost();
}
public String getAllocatedHost() {
@@ -57,14 +60,6 @@ public class WorkerResource {
this.allocatedHost = allocatedHost;
}
- public int[] getPorts() {
- return ports;
- }
-
- public void setPorts(int[] ports) {
- this.ports = ports;
- }
-
public void addUsedDiskSlots(int diskSlots) {
usedDiskSlots += diskSlots;
}
@@ -115,18 +110,8 @@ public class WorkerResource {
", used=" + usedMemoryMBSlots + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
}
- private String portsToStr() {
- if(ports == null) {
- return "null";
- }
- String result = "";
- String prefix = "";
- for(int i = 0; i < ports.length; i++) {
- result += prefix + ports[i];
- prefix = ",";
- }
-
- return result;
+ public String portsToStr() {
+ return managerPort + "," + clientPort + "," + pullServerPort;
}
public void setLastHeartbeat(long heartbeatTime) {
@@ -182,13 +167,51 @@ public class WorkerResource {
queryMasterAllocated = false;
}
- usedMemoryMBSlots -= workerResource.memoryMBSlots;
- usedDiskSlots -= workerResource.diskSlots;
+ usedMemoryMBSlots = usedMemoryMBSlots - workerResource.memoryMBSlots;
+ //usedDiskSlots = usedDiskSlots - workerResource.diskSlots;
if(usedMemoryMBSlots < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
-// LOG.warn("Used resources can't be a minus.");
- LOG.trace("Used resources can't be a minus.");
+ LOG.warn("Used resources can't be a minus.");
LOG.warn(this + " ==> " + workerResource);
}
}
+
+ public int getSlots() {
+ //TODO what is slot? 512MB = 1slot?
+ return memoryMBSlots/512;
+ }
+
+ public int getAvaliableSlots() {
+ //TODO what is slot? 512MB = 1slot?
+ return getAvailableMemoryMBSlots()/512;
+ }
+
+ public int getUsedSlots() {
+ //TODO what is slot? 512MB = 1slot?
+ return usedMemoryMBSlots/512;
+ }
+
+ public int getManagerPort() {
+ return managerPort;
+ }
+
+ public void setManagerPort(int managerPort) {
+ this.managerPort = managerPort;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ public void setClientPort(int clientPort) {
+ this.clientPort = clientPort;
+ }
+
+ public int getPullServerPort() {
+ return pullServerPort;
+ }
+
+ public void setPullServerPort(int pullServerPort) {
+ this.pullServerPort = pullServerPort;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index a066321..93772ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryInProgress;
import java.io.IOException;
+import java.util.Map;
public interface WorkerResourceManager {
@@ -56,5 +57,9 @@ public interface WorkerResourceManager {
public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource);
+ public Map<String, WorkerResource> getWorkers();
+
public void stop();
+
+ public int getNumClusterSlots();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
index d7adad8..b159983 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
@@ -76,8 +76,6 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
RegisterApplicationMasterResponse response;
try {
response = registerApplicationMaster("localhost", 10080, "http://localhost:1234");
- context.getResourceAllocator().setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
- context.getResourceAllocator().setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
// If the number of cluster nodes is ZERO, it waits for available nodes.
AllocateResponse allocateResponse = allocate(0.0f);
@@ -90,7 +88,7 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
LOG.error(e);
}
}
- context.getResourceAllocator().setNumClusterNodes(allocateResponse.getNumClusterNodes());
+ context.getQueryMasterContext().getWorkerContext().setNumClusterNodes(allocateResponse.getNumClusterNodes());
} catch (YarnRemoteException e) {
LOG.error(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 8756eed..9bb153c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -75,6 +75,14 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
public void stop() {
}
+ public Map<String, WorkerResource> getWorkers() {
+ return new HashMap<String, WorkerResource>();
+ }
+
+ public int getNumClusterSlots() {
+ return 0;
+ }
+
@Override
public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
//nothing to do
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index ebc3f08..c75fd96 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -26,10 +26,6 @@ import java.util.HashMap;
import java.util.Map;
public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
- private int minCapability;
- private int maxCapability;
- private int numCluster;
-
private Map<ContainerId, ContainerProxy> containers = new HashMap<ContainerId, ContainerProxy>();
public AbstractResourceAllocator() {
@@ -55,25 +51,4 @@ public abstract class AbstractResourceAllocator extends CompositeService impleme
public Map<ContainerId, ContainerProxy> getContainers() {
return containers;
}
-
- public void setMaxContainerCapability(int capability) {
- this.maxCapability = capability;
- }
-
- public int getMaxContainerCapability() {
- return this.maxCapability;
- }
-
- public void setMinContainerCapability(int capability) {
- this.minCapability = capability;
- }
-
- public int getNumClusterNode() {
- return numCluster;
- }
-
- public void setNumClusterNodes(int num) {
- numCluster = num;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
index 108c7b7..f0c70cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -24,4 +24,5 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
public interface ResourceAllocator {
public void allocateTaskWorker();
public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 3264f55..e6ec9c0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -92,7 +92,12 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void allocateTaskWorker() {
- //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
+ int clusterSlots = workerContext.getNumClusterSlots();
+ return clusterSlots == 0 ? 1: Math.min(numTasks, clusterSlots);
}
@Override
@@ -283,14 +288,14 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
continue;
}
- List<String> workerHosts = response.getAllocatedWorksList();
+ List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
List<Container> containers = new ArrayList<Container>();
- for(String eachWorker: workerHosts) {
+ for(TajoMasterProtocol.WorkerAllocatedResource eachWorker: workerHosts) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeIdPBImpl nodeId = new NodeIdPBImpl();
- String[] tokens = eachWorker.split(":");
+ String[] tokens = eachWorker.getWorkerHostAndPort().split(":");
nodeId.setHost(tokens[0]);
nodeId.setPort(Integer.parseInt(tokens[1]));
@@ -306,7 +311,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
WorkerResource workerResource = new WorkerResource();
workerResource.setAllocatedHost(nodeId.getHost());
- workerResource.setPorts(new int[]{nodeId.getPort()});
+ workerResource.setManagerPort(nodeId.getPort());
+ workerResource.setPullServerPort(eachWorker.getWorkerPullServerPort());
workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
workerResource.setDiskSlots(requiredDiskSlots);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 3b116f2..bb27ee4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -33,7 +33,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryMaster;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.CallFuture2;
import org.apache.tajo.rpc.ProtoAsyncRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.TajoIdUtils;
@@ -45,7 +45,10 @@ import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class TajoWorker extends CompositeService {
public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
@@ -78,6 +81,10 @@ public class TajoWorker extends CompositeService {
private AtomicBoolean stopped = new AtomicBoolean(false);
+ private AtomicInteger numClusterNodes = new AtomicInteger();
+
+ private AtomicInteger numClusterSlots = new AtomicInteger();
+
public TajoWorker(String daemonMode) throws Exception {
super(TajoWorker.class.getName());
this.daemonMode = daemonMode;
@@ -214,6 +221,22 @@ public class TajoWorker extends CompositeService {
public boolean isStandbyMode() {
return !"qm".equals(daemonMode) && !"tr".equals(daemonMode);
}
+
+ public void setNumClusterNodes(int numClusterNodes) {
+ TajoWorker.this.numClusterNodes.set(numClusterNodes);
+ }
+
+ public int getNumClusterNodes() {
+ return TajoWorker.this.numClusterNodes.get();
+ }
+
+ public void setNumClusterSlots(int numClusterSlots) {
+ TajoWorker.this.numClusterSlots.set(numClusterSlots);
+ }
+
+ public int getNumClusterSlots() {
+ return TajoWorker.this.numClusterSlots.get();
+ }
}
public void stopWorkerForce() {
@@ -225,14 +248,7 @@ public class TajoWorker extends CompositeService {
//QueryMaster mode
String tajoMasterAddress = params[2];
- LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
- InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
- try {
- tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
- tajoMasterRpcClient = tajoMasterRpc.getStub();
- } catch (Exception e) {
- LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
- }
+ connectToTajoMaster(tajoMasterAddress);
QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
@@ -242,17 +258,28 @@ public class TajoWorker extends CompositeService {
taskRunnerManager.startTask(params);
} else {
//Standby mode
- String tajoMasterAddress = tajoConf.get("tajo.master.manager.addr");
- LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
- InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+ connectToTajoMaster(tajoConf.get("tajo.master.manager.addr"));
+ workerHeartbeatThread = new WorkerHeartbeatThread();
+ workerHeartbeatThread.start();
+ }
+ }
+
+ private void connectToTajoMaster(String tajoMasterAddress) {
+ LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
+ InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+ while(true) {
try {
tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
tajoMasterRpcClient = tajoMasterRpc.getStub();
+ break;
} catch (Exception e) {
LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
}
- workerHeartbeatThread = new WorkerHeartbeatThread();
- workerHeartbeatThread.start();
+
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ }
}
}
@@ -302,8 +329,15 @@ public class TajoWorker extends CompositeService {
}
public void run() {
+ CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+ new CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse>();
LOG.info("Worker Resource Heartbeat Thread start.");
int sendDiskInfoCount = 0;
+ int pullServerPort = 0;
+ if(pullService != null) {
+ pullServerPort = pullService.getPort();
+ }
+
while(true) {
if(sendDiskInfoCount == 0 && mountPaths != null) {
for(File eachFile: mountPaths) {
@@ -323,14 +357,33 @@ public class TajoWorker extends CompositeService {
.setDiskSlots(workerDiskSlots)
.build();
+
TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
.setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
.setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setTajoWorkerPullServerPort(pullServerPort)
.setServerStatus(serverStatus)
.build();
- workerContext.getTajoMasterRpcClient().heartbeat(null, heartbeatProto, NullCallback.get());
+ workerContext.getTajoMasterRpcClient().heartbeat(null, heartbeatProto, callBack);
+
+ try {
+ TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+ if(response != null) {
+ if(response.getNumClusterNodes() > 0) {
+ workerContext.setNumClusterNodes(response.getNumClusterNodes());
+ }
+
+ if(response.getNumClusterSlots() > 0) {
+ workerContext.setNumClusterSlots(response.getNumClusterSlots());
+ }
+ }
+ } catch (InterruptedException e) {
+ break;
+ } catch (TimeoutException e) {
+ }
+
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 47ec7bc..8493be2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -173,6 +173,10 @@ public class TaskRunner extends AbstractService {
}
}
+ public String getId() {
+ return executionBlockId + "," + containerId;
+ }
+
@Override
public void init(Configuration conf) {
this.queryConf = (QueryConf)conf;
@@ -313,7 +317,7 @@ public class TaskRunner extends AbstractService {
try {
if (callFuture == null) {
callFuture = new CallFuture2<QueryUnitRequestProto>();
- LOG.info("====>Request GetTask:" + executionBlockId + "," + containerId);
+ LOG.info("====>Request GetTask:" + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
.setContainerId(((ContainerIdPBImpl) containerId).getProto())
@@ -333,7 +337,7 @@ public class TaskRunner extends AbstractService {
}
// if there has been no assigning task for a given period,
// TaskRunner will retry to request an assigning task.
- LOG.warn("Timeout getResource:" + executionBlockId + ", but retry", te);
+ LOG.warn("Timeout getResource:" + getId() + ", but retry", te);
continue;
}
@@ -342,11 +346,11 @@ public class TaskRunner extends AbstractService {
// If TaskRunner receives the terminal signal, TaskRunner will be terminated
// immediately.
if (taskRequest.getShouldDie()) {
- LOG.info("Received ShouldDie flag:" + executionBlockId);
+ LOG.info("Received ShouldDie flag:" + getId());
stop();
if(taskRunnerManager != null) {
//notify to TaskRunnerManager
- taskRunnerManager.stopTask(executionBlockId);
+ taskRunnerManager.stopTask(getId());
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index dcd44df..5d8fa8d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -22,7 +22,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryConf;
import org.apache.tajo.conf.TajoConf;
@@ -33,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class TaskRunnerManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
- private Map<ExecutionBlockId, TaskRunner> taskRunnerMap = new HashMap<ExecutionBlockId, TaskRunner>();
+ private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
private TajoWorker.WorkerContext workerContext;
private TajoConf tajoConf;
private AtomicBoolean stop = new AtomicBoolean(false);
@@ -74,10 +73,10 @@ public class TaskRunnerManager extends CompositeService {
}
}
- public void stopTask(ExecutionBlockId executionBlockId) {
- LOG.info("Stop Task:" + executionBlockId);
+ public void stopTask(String id) {
+ LOG.info("Stop Task:" + id);
synchronized(taskRunnerMap) {
- taskRunnerMap.remove(executionBlockId);
+ taskRunnerMap.remove(id);
}
if(!workerContext.isStandbyMode()) {
stop();
@@ -91,8 +90,9 @@ public class TaskRunnerManager extends CompositeService {
try {
QueryConf queryConf = new QueryConf(tajoConf);
TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, queryConf, params);
+ LOG.info("Start TaskRunner:" + taskRunner.getId());
synchronized(taskRunnerMap) {
- taskRunnerMap.put(taskRunner.getContext().getExecutionBlockId(), taskRunner);
+ taskRunnerMap.put(taskRunner.getId(), taskRunner);
}
taskRunner.init(queryConf);
taskRunner.start();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index 9470a88..d47cc81 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -61,7 +61,12 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
@Override
public void allocateTaskWorker() {
- //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
+ int numClusterNodes = workerContext.getNumClusterNodes();
+ return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * 4);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index 0153c8d..04abfa3 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -56,6 +56,7 @@ message TajoHeartbeat {
optional QueryIdProto queryId = 5;
optional QueryState state = 6;
optional string statusMessage = 7;
+ optional int32 tajoWorkerPullServerPort = 8;
}
message TajoHeartbeatResponse {
@@ -64,7 +65,9 @@ message TajoHeartbeatResponse {
repeated string params = 2;
}
required BoolProto heartbeatResult = 1;
- optional ResponseCommand responseCommand = 3;
+ required int32 numClusterNodes = 2;
+ required int32 numClusterSlots = 3;
+ optional ResponseCommand responseCommand = 4;
}
message WorkerResourceAllocationRequest {
@@ -85,9 +88,14 @@ message WorkerResourceReleaseRequest {
repeated WorkerResourceProto workerResources = 1;
}
+message WorkerAllocatedResource {
+ required string workerHostAndPort = 1;
+ required int32 workerPullServerPort = 2;
+}
+
message WorkerResourceAllocationResponse {
required ExecutionBlockIdProto executionBlockId = 1;
- repeated string allocatedWorks = 2;
+ repeated WorkerAllocatedResource workerAllocatedResource = 2;
}
service TajoMasterProtocolService {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index 3e8af74..7cedfd4 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -1,169 +1,57 @@
-<%@ page language="java" contentType="text/html; charset=UTF-8"
- pageEncoding="UTF-8"%>
-<%--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--- %>
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="java.util.*" %>
-<%@ page import="tajo.webapp.StaticHttpServer" %>
-<%@ page import="nta.catalog.*" %>
-<%@ page import="nta.engine.*" %>
-<%@ page import="nta.engine.cluster.ClusterManager" %>
<%@ page import="java.net.InetSocketAddress" %>
<%@ page import="java.net.InetAddress" %>
<%@ page import="org.apache.hadoop.conf.Configuration" %>
-<%@ page import="nta.engine.NConstants" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
- <head>
- <link rel="stylesheet" type = "text/css" href = "./style.css" />
- <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
- <title>tajo main</title>
- <%
- NtaEngineMaster master = (NtaEngineMaster)application.getAttribute("tajo.master");
- CatalogService catalog = master.getCatalog();
- HashMap<String,String> map = (HashMap<String,String>)application.getAttribute("tajo.online.worker");
- List<String> serverList = master.getOnlineServer();
- HashMap<String,String> curMap = new HashMap<String, String>();
- if(map == null) {
- map = new HashMap<String,String>();
- for(int i = 0 ; i < serverList.size() ; i ++) {
- String tmp = serverList.get(i);
- map.put(tmp,tmp);
- curMap.put(tmp, tmp);
- }
- application.setAttribute("tajo.online.worker", map);
- }else {
- for(int i = 0 ; i < serverList.size() ; i ++) {
- String tmp = serverList.get(i);
- if(!map.containsKey(tmp)) {
- map.put(tmp, tmp);
- }
- curMap.put(tmp, tmp);
- }
- }
- String masterAddr = (String)application.getAttribute("tajo.master.addr");
- ClusterManager cm = master.getClusterManager();
- String[] tableArr = catalog.getAllTableNames().toArray(new String[0]);
-
- long totalDisk = 0;
- long availableDisk = 0;
- final double MB = Math.pow(1024, 4);
- for(int i = 0 ; i < serverList.size() ; i ++) {
- ClusterManager.WorkerInfo info = cm.getWorkerInfo(serverList.get(i));
- List<ClusterManager.DiskInfo> disks = info.disks;
- for(int j = 0 ; j < disks.size() ; j ++) {
- totalDisk += disks.get(j).totalSpace;
- availableDisk += disks.get(j).freeSpace;
- }
- }
-
- long time = (System.currentTimeMillis() - (Long)application.getAttribute("tajo.master.starttime"))/1000;
-
- %>
- </head>
- <body>
- <div class = "container">
- <div>
- <img src="./img/tajochar_title_small.jpg" />
- </div>
- </div>
- <br />
- <div class = "headline_2">
- <div class = "container">
- <a href="./catalogview.jsp" class="headline">Catalog</a>
-
- <a href="./nodeview.jsp" class="headline">Workers</a>
-
- <a href="./queryview.jsp" class="headline">Queries</a>
- </div>
- </div>
-
- <div class ="container">
- <div class = "outbox">
- <h2 class = "compactline">System Summary</h2>
- <table align = "center"class = "noborder">
- <tr>
- <th class="rightbottom">Cluster Number</th>
- <th class="rightbottom">Live workers</th>
- <th class="rightbottom">Table number</th>
- <th class="rightbottom">Total Disk Size</th>
- <th class="rightbottom">Available Disk Size</th>
- <th class="bottom">Running Time</th>
- </tr>
- <tr>
- <td class="rightborder"><%=map.size()%></td>
- <td class="rightborder"><%=curMap.size()%></td>
- <td class="rightborder"><%=tableArr.length%></td>
- <td class="rightborder"><%=String.format("%.2f", totalDisk/MB)%>TB</td>
- <td class="rightborder"><%=String.format("%.2f", availableDisk/MB)%>TB</td>
- <td class="noborder"><%=time/3600/24 + "d" + time/3600 + "h" + time/60%60 + "m" + time%60+"s"%></td>
- </tr>
- </table>
- </div>
-
- <div class="outbox_order">
- <h2 class="compactline">Table List</h2>
- <table align = "center" class = "new">
- <tr>
- <th style="width:110px">TableName</th>
- <th>TablePath</th>
- </tr>
- <%
- for(int i = 0 ; i < tableArr.length ; i ++ ) {
- TableDesc table = catalog.getTableDesc(tableArr[i]);
- %>
- <tr>
- <td><a href = "./catalogview.jsp?tablename=<%=table.getId()%>" class = "tablelink"><%=table.getId()%></a></td>
- <td><%=table.getPath()%></td>
- </tr>
- <%
- }
- %>
- </table>
- </div>
- <div style="float:left; width:6px"> </div>
- <div class="outbox_order">
- <h2 class="compactline">Worker List</h2>
- <table align="center" class = "new">
- <tr>
- <th style="width:90px">Status</th>
- <th>Worker Name</th>
- </tr>
- <%
- Set<String> keySet = map.keySet();
- for(String key : keySet ) {
- out.write("<tr>");
- if(!curMap.containsKey(key)) {
- out.write("<td><img src=\"./img/off.jpg\" /> Offline</td>");
- out.write("<td><a href =\"http://" +key.split(":")[0] + ":8080/nodedetail.jsp?workername="+key+"\" class = \"tablelink\">" + key + "</a></td>");
- }
- else{
- out.write("<td><img src=\"./img/on.jpg\" /> Online</td>");
- out.write("<td><a href =\"http://" +key.split(":")[0] + ":8080/nodedetail.jsp?workername="+key+"\" class = \"tablelink\">" + key + "</a></td>");
- }
- out.write("</tr>");
- }
- %>
- </table>
- </div>
-
- <div style="clear:both"></div>
-
-
- </body>
+<head>
+ <link rel="stylesheet" type = "text/css" href = "./style.css" />
+ <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+ <title>tajo main</title>
+ <%
+ TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ CatalogService catalog = master.getCatalog();
+ Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+ %>
+</head>
+<body>
+<img src='img/tajo_logo.png'/>
+<hr/>
+<table>
+ <tr><th>Worker</th><th>Ports</th><th>Slot</th></th><th>Memory(Used/Capacity)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+ <%
+ List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Collections.sort(wokerKeys);
+ for(String eachWorker: wokerKeys) {
+ WorkerResource worker = workers.get(eachWorker);
+ %>
+ <tr>
+ <td><%=eachWorker%></td>
+ <td><%=worker.portsToStr()%></td>
+ <td><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
+ <td><%=worker.getUsedMemoryMBSlots()%>/<%=worker.getMemoryMBSlots()%> MB</td>
+ <td><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
+ <td><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+ <td><%=worker.getWorkerStatus()%></td>
+ </tr>
+ <%
+ }
+
+ if(workers.isEmpty()) {
+ %>
+ <tr>
+ <td colspan='7'>No Workers</td>
+ </tr>
+ <%
+ }
+ %>
+</table>
+</body>
</html>